fix: add realtime worker
This commit is contained in:
parent
a0c43f1fe3
commit
afd40cae77
2 changed files with 1402 additions and 1335 deletions
119
app/realtime.php
119
app/realtime.php
|
@ -95,16 +95,33 @@ $server->error($logError);
|
|||
|
||||
function getDatabase(Registry &$register, string $namespace)
|
||||
{
|
||||
$attempts = 0;
|
||||
|
||||
$redis = $register->get('redisPool')->get();
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
|
||||
$consoleDB = $register->get('dbPool')->getConsoleDBFromPool();
|
||||
$db = $consoleDB;
|
||||
$dbName = '';
|
||||
if ($namespace != '_console') {
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($consoleDB), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace('_console'); // Main DB
|
||||
|
||||
$project = $consoleDB->getDocument('projects', $namespace);
|
||||
$dbName = $project->getAttribute('database', '');
|
||||
if (!empty($dbName)) {
|
||||
$projectDB = $register->get('dbPool')->getDBFromPool($dbName);
|
||||
$db = $projectDB;
|
||||
}
|
||||
}
|
||||
|
||||
$attempts = 0;
|
||||
|
||||
do {
|
||||
try {
|
||||
$attempts++;
|
||||
|
||||
$db = $register->get('dbPool')->get();
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace($namespace);
|
||||
|
@ -124,8 +141,13 @@ function getDatabase(Registry &$register, string $namespace)
|
|||
|
||||
return [
|
||||
$database,
|
||||
function () use ($register, $db, $redis) {
|
||||
$register->get('dbPool')->put($db);
|
||||
function () use ($register, $db, $dbName, $redis) {
|
||||
if (empty($dbName)) {
|
||||
$register->get('dbPool')->putConsoleDb($db);
|
||||
} else {
|
||||
$register->get('dbPool')->put($db, $dbName);
|
||||
}
|
||||
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
];
|
||||
|
@ -349,39 +371,55 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
$response = new Response(new SwooleResponse());
|
||||
|
||||
/** @var PDO $db */
|
||||
$db = $register->get('dbPool')->get();
|
||||
$dbPool = $register->get('dbPool');
|
||||
$consoleDB = $dbPool->getConsoleDBFromPool();
|
||||
|
||||
/** @var Redis $redis */
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
Console::info("Connection open (user: {$connection})");
|
||||
|
||||
App::setResource('db', fn () => $db);
|
||||
App::setResource('consoleDB', fn() => $consoleDB);
|
||||
App::setResource('cache', fn () => $redis);
|
||||
App::setResource('request', fn () => $request);
|
||||
App::setResource('response', fn () => $response);
|
||||
|
||||
try {
|
||||
/** @var \Utopia\Database\Document $user */
|
||||
$user = $app->getResource('user');
|
||||
|
||||
/** @var \Utopia\Database\Document $project */
|
||||
$project = $app->getResource('project');
|
||||
|
||||
/*
|
||||
* Project Check
|
||||
*/
|
||||
var_dump($project);
|
||||
if (empty($project->getId())) {
|
||||
throw new Exception('Missing or unknown project ID', 1008);
|
||||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
$projectDB = $consoleDB;
|
||||
if ($projectId !== 'console') {
|
||||
$dbForConsole = $app->getResource('dbForConsole'); /** @var Utopia\Database\Database $dbForConsole */
|
||||
$project = Authorization::skip(fn() => $dbForConsole->getDocument('projects', $projectId));
|
||||
$dbName = $project->getAttribute('database', '');
|
||||
if (!empty($dbName)) {
|
||||
$projectDB = $dbPool->getDBFromPool($dbName);
|
||||
}
|
||||
}
|
||||
|
||||
App::setResource('projectDB', fn() => $projectDB);
|
||||
|
||||
/** @var \Utopia\Database\Document $user */
|
||||
$user = $app->getResource('user');
|
||||
|
||||
/** @var \Utopia\Database\Document $console */
|
||||
$console = $app->getResource('console');
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database = new Database(new MariaDB($projectDB), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace("_{$project->getId()}");
|
||||
|
||||
/*
|
||||
* Project Check
|
||||
*/
|
||||
if (empty($project->getId())) {
|
||||
throw new Exception('Missing or unknown project ID', 1008);
|
||||
}
|
||||
|
||||
/*
|
||||
* Abuse Check
|
||||
*
|
||||
|
@ -466,7 +504,13 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
/**
|
||||
* Put used PDO and Redis Connections back into their pools.
|
||||
*/
|
||||
$register->get('dbPool')->put($db);
|
||||
/** @var PDOPool $consolePool */
|
||||
$dbPool->putConsoleDb($consoleDB);
|
||||
|
||||
if (!empty($dbName) && !empty($projectDB)) {
|
||||
$dbPool->put($projectDB, $dbName);
|
||||
}
|
||||
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
});
|
||||
|
@ -474,13 +518,30 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
|
||||
try {
|
||||
$response = new Response(new SwooleResponse());
|
||||
$db = $register->get('dbPool')->get();
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
$dbPool = $register->get('dbPool');
|
||||
$consoleDB = $dbPool->getConsoleDBFromPool();
|
||||
|
||||
$redis = $register->get('redisPool')->get();
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
|
||||
|
||||
$projectId = $realtime->connections[$connection]['projectId'];
|
||||
$projectDB = $consoleDB;
|
||||
if ($projectId !== 'console') {
|
||||
$dbForConsole = new Database(new MariaDB($projectDB), $cache);
|
||||
$dbForConsole->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$dbForConsole->setNamespace("_console");
|
||||
$project = Authorization::skip(fn() => $dbForConsole->getDocument('projects', $projectId));
|
||||
$dbName = $project->getAttribute('database', '');
|
||||
if (!empty($dbName)) {
|
||||
$projectDB = $dbPool->getDBFromPool($dbName);
|
||||
}
|
||||
}
|
||||
|
||||
$database = new Database(new MariaDB($projectDB), $cache);
|
||||
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
|
||||
$database->setNamespace("_{$realtime->connections[$connection]['projectId']}");
|
||||
$database->setNamespace("_$projectId");
|
||||
|
||||
/*
|
||||
* Abuse Check
|
||||
|
@ -563,7 +624,13 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
$server->close($connection, $th->getCode());
|
||||
}
|
||||
} finally {
|
||||
$register->get('dbPool')->put($db);
|
||||
/** @var PDOPool $consolePool */
|
||||
$dbPool->putConsoleDb($consoleDB);
|
||||
|
||||
if (!empty($dbName) && !empty($projectDB)) {
|
||||
$dbPool->put($projectDB, $dbName);
|
||||
}
|
||||
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
});
|
||||
|
|
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue