1
0
Fork 0
mirror of synced 2024-09-28 23:41:23 +12:00

feat: Adding DI to realtime

This commit is contained in:
Binyamin Yawitz 2024-06-04 13:12:10 -04:00
parent 0093ad7799
commit f7d9efc18e
No known key found for this signature in database

View file

@ -5,6 +5,7 @@ use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Queue\Connections;
use Appwrite\Utopia\Request;
use Appwrite\Utopia\Response;
use Swoole\Http\Request as SwooleRequest;
@ -14,10 +15,11 @@ use Swoole\Table;
use Swoole\Timer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Adapter\None;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Adapter\MySQL;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
@ -25,93 +27,80 @@ use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Http\Adapter\FPM\Server as FPMServer;
use Utopia\DI\Container;
use Utopia\DI\Dependency;
use Utopia\Http\Adapter\Swoole\Request as UtopiaRequest;
use Utopia\Http\Adapter\Swoole\Response as UtopiaResponse;
use Utopia\Http\Http;
use Utopia\Logger\Log;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Utopia\WebSocket\Adapter;
use Utopia\WebSocket\Server;
global $global;
/**
* @var \Utopia\Registry\Registry $global
* @var Registry $global
* @var Container $container
*/
global $global, $container;
require_once __DIR__ . '/init2.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
function getConsoleDB(Authorization $auth): Database
{
global $global;
$auth = new Dependency();
$cache = new Dependency();
$getProjectDB = new Dependency();
/** @var \Utopia\Pools\Group $pools */
$pools = $global->get('pools');
$auth
->setName('auth')
->setCallback(fn () => new Authorization());
$dbAdapter = $pools
->get('console')
->pop()
->getResource()
;
$getProjectDB
->setName('getProjectDB')
->inject('pools')
->inject('dbForConsole')
->inject('cache')
->inject('auth')
->inject('connections')
->setCallback(function (array $pools, Database $dbForConsole, Cache $cache, Authorization $auth, Connections $connections) {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections): Database {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$database = new Database($dbAdapter, getCache());
$database->setAuthorization($auth);
$databaseName = $project->getAttribute('database');
$database
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
$pool = $pools['pools-database-' . $databaseName]['pool'];
$dsn = $pools['pools-database-' . $databaseName]['dsn'];
return $database;
}
$connection = $pool->get();
$connections->add($connection, $pool);
$adapter = match ($dsn->getScheme()) {
'mariadb' => new MariaDB($connection),
'mysql' => new MySQL($connection),
default => null
};
$adapter->setDatabase($dsn->getPath());
function getProjectDB(Document $project, Authorization $auth): Database
{
global $global;
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$database->setNamespace('_' . $project->getInternalId());
/** @var \Utopia\Pools\Group $pools */
$pools = $global->get('pools');
return $database;
};
});
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB($auth);
}
$cache
->setName('cache')
->setCallback(function () {
return new Cache(new None());
});
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$database = new Database($dbAdapter, getCache());
$database->setAuthorization($auth);
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
return $database;
}
function getCache(): Cache
{
global $global;
$pools = $global->get('pools'); /** @var \Utopia\Pools\Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}
$container->set($auth);
$container->set($cache);
$container->set($getProjectDB);
$realtime = new Realtime();
@ -175,18 +164,16 @@ $logError = function (Throwable $error, string $action) use ($global) {
$server->error($logError);
$server->onStart(function () use ($stats, $global, $containerId, &$statsDocument, $logError) {
$auth = new Authorization();
$server->onStart(function () use ($stats, $container, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
/**
* Create document for this worker to share stats across Containers.
*/
go(function () use ($global, $containerId, &$statsDocument, $auth) {
go(function () use ($container, $containerId, &$statsDocument) {
$attempts = 0;
$database = getConsoleDB($auth);
$database = $container->get('dbForConsole');
do {
try {
@ -208,13 +195,13 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
$global->get('pools')->reclaim();
// TODO NOW $global->get('pools')->reclaim();
});
/**
* Save current connections to the Database every 5 seconds.
*/
Timer::tick(5000, function () use ($global, $stats, &$statsDocument, $logError, $auth) {
Timer::tick(5000, function () use ($container, $stats, &$statsDocument, $logError) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
@ -224,7 +211,7 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument
}
try {
$database = getConsoleDB($auth);
$database = $container->get('dbForConsole');
$statsDocument
->setAttribute('timestamp', DateTime::now())
@ -235,12 +222,12 @@ $server->onStart(function () use ($stats, $global, $containerId, &$statsDocument
} catch (Throwable $th) {
call_user_func($logError, $th, "updateWorkerDocument");
} finally {
$global->get('pools')->reclaim();
// TODO NOW $global->get('pools')->reclaim();
}
});
});
$server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $realtime, $logError) {
$server->onWorkerStart(function (int $workerId) use ($server, $container, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
@ -248,12 +235,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
$auth = new Authorization();
Timer::tick(5000, function () use ($server, $global, $realtime, $stats, $logError, $auth) {
Timer::tick(5000, function () use ($server, $container, $realtime, $stats, $logError, $auth) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
$database = getConsoleDB($auth);
$database = $container->get('dbForConsole');
$payload = [];
@ -267,9 +254,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
$payload[$projectId] = $value;
}
}
}
@ -297,8 +284,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
'data' => $event['data']
]));
}
$global->get('pools')->reclaim();
// TODO NOW $global->get('pools')->reclaim();
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
@ -327,14 +313,22 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
Console::error(
'Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')'
);
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$redis = $global->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$pools = $container->get('pools');
$pool = $pools['pools-pubsub-main']['pool'];
$dsn = $pools['pools-pubsub-main']['dsn'];
$redis = new \Redis();
$redis->connect($dsn->getHost(), $dsn->getPort());
/** @var Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -344,7 +338,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $global, $realtime, $auth) {
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $realtime, $auth, $container) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
@ -353,18 +347,17 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = getConsoleDB($auth);
$consoleDatabase = $container->get('dbForConsole');
$auth = new Authorization();
$project = $auth->skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
$database = getProjectDB($project, $auth);
$database = $container->get('getProjectDB')($project);
$user = $database->getDocument('users', $userId);
$roles = Auth::getRoles($user, $auth);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
$global->get('pools')->reclaim();
//TODO NOW $global->get('pools')->reclaim();
}
}
@ -404,25 +397,29 @@ $server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $global, $stats, &$realtime, $logError) {
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $container, $stats, &$realtime, $logError) {
$auth = new Authorization();
$http = new Http(new FPMServer(), 'UTC');
$request = new Request($request);
$response = new Response(new SwooleResponse());
$request = new Request(new UtopiaRequest($request));
$response = new Response(new UtopiaResponse(new SwooleResponse()));
$requestInjection = new Dependency();
$responseInjection = new Dependency();
$requestInjection->setName('request')->setCallback(fn () => $request);
$responseInjection->setName('response')->setCallback(fn () => $response);
$container->set($requestInjection);
$container->set($responseInjection);
Console::info("Connection open (user: {$connection})");
Http::setResource('pools', fn () => $global->get('pools'));
Http::setResource('request', fn () => $request);
Http::setResource('response', fn () => $response);
try {
/** @var Document $project */
$project = $http->getResource('project');
$project = $container->get('project');
/*
* Project Check
* Project Check
*/
if (empty($project->getId())) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
@ -437,9 +434,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
}
$dbForProject = getProjectDB($project, $auth);
$console = $http->getResource('console'); /** @var Document $console */
$user = $http->getResource('user'); /** @var Document $user */
$dbForProject = $container->get('getProjectDB')($project);
$console = $container->get('console');
/** @var Document $console */
$user = $container->get('user');
/** @var Document $user */
/*
* Abuse Check
@ -519,22 +518,20 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Message: ' . $response['data']['message']);
}
} finally {
$global->get('pools')->reclaim();
// TODO NOW $global->get('pools')->reclaim();
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $global, $realtime, $containerId) {
$auth = new Authorization();
$server->onMessage(function (int $connection, string $message) use ($server, $container, $realtime, $containerId) {
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'];
$database = getConsoleDB($auth);
$database = $container->get('dbForConsole');
if ($projectId !== 'console') {
$auth = new Authorization();
$project = $auth->skip(fn () => $database->getDocument('projects', $projectId));
$database = getProjectDB($project, $auth);
$database = $container->get('getProjectDB')($project);
} else {
$project = null;
}
@ -619,7 +616,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $gl
$server->close($connection, $th->getCode());
}
} finally {
$global->get('pools')->reclaim();
// TODO NOW $global->get('pools')->reclaim();
}
});