1
0
Fork 0
mirror of synced 2024-06-02 10:54:44 +12:00

Merge branch 'refactor-usage-sn' of github.com:appwrite/appwrite into refactor-usage-sn

This commit is contained in:
Matej Bačo 2024-04-16 09:59:24 +02:00
commit fd81c95e38
5 changed files with 161 additions and 433 deletions

View file

@ -2,7 +2,6 @@
require_once __DIR__ . '/../vendor/autoload.php';
use Appwrite\Utopia\Pools\Connections;
use Appwrite\Utopia\Response;
use Swoole\Process;
use Swoole\Http\Server;
@ -324,14 +323,7 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
$swooleResponse->end(\json_encode($output));
} finally {
/**
* @var Connections $connections
*/
$connections = $app->getResource('connections');
if (!empty($connections)) {
$connections->reclaim();
}
$pools->reclaim();
}
});

View file

@ -33,7 +33,6 @@ use Appwrite\Network\Validator\Email;
use Appwrite\Network\Validator\Origin;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\URL\URL as AppwriteURL;
use Appwrite\Utopia\Pools\Connections;
use Utopia\App;
use Utopia\Logger\Logger;
use Utopia\Cache\Adapter\Redis as RedisCache;
@ -882,14 +881,11 @@ App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en')))
App::setResource('localeCodes', function () {
return array_map(fn($locale) => $locale['code'], Config::getParam('locale-codes', []));
});
// Queues
App::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
}, ['pools', 'connections']);
App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
App::setResource('queueForMessaging', function (Connection $queue) {
return new Phone($queue);
}, ['queue']);
@ -1130,22 +1126,15 @@ App::setResource('console', function () {
]);
}, []);
App::setResource('connections', function () {
return new Connections();
});
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Connections $connections) {
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$connection = $pools
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
@ -1156,16 +1145,14 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole,
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'dbForConsole', 'cache', 'project', 'connections']);
}, ['pools', 'dbForConsole', 'cache', 'project']);
App::setResource('dbForConsole', function (Group $pools, Cache $cache, Connections $connections) {
$connection = $pools
App::setResource('dbForConsole', function (Group $pools, Cache $cache) {
$dbAdapter = $pools
->get('console')
->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
->pop()
->getResource()
;
$database = new Database($dbAdapter, $cache);
@ -1176,12 +1163,12 @@ App::setResource('dbForConsole', function (Group $pools, Cache $cache, Connectio
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'cache', 'connections']);
}, ['pools', 'cache']);
App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) {
App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases) {
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -1193,47 +1180,48 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId())
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}
$connection = $pools
$dbAdapter = $pools
->get($databaseName)
->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$databases[$databaseName] = $database;
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId())
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
};
return $getProjectDB;
}, ['pools', 'dbForConsole', 'cache', 'connections']);
}, ['pools', 'dbForConsole', 'cache']);
App::setResource('cache', function (Group $pools, Connections $connections) {
App::setResource('cache', function (Group $pools) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$connection = $pools
$adapters[] = $pools
->get($value)
->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}, ['pools', 'connections']);
}, ['pools']);
App::setResource('deviceLocal', function () {
return new Local();

View file

@ -26,145 +26,92 @@ use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Registry\Registry;
use Utopia\WebSocket\Server;
use Utopia\WebSocket\Adapter;
/**
* @var Registry $register
* @var \Utopia\Registry\Registry $register
*/
require_once __DIR__ . '/init.php';
Runtime::enableCoroutine();
$redisConnections = [];
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
// Allows overriding
if (!function_exists('getConsoleDB')) {
/**
* @return array{Database, callable}
* @throws Exception|\Exception
*/
function getConsoleDB(): array
if (!function_exists("getConsoleDB")) {
function getConsoleDB(): Database
{
global $register;
/** @var Group $pools */
/** @var \Utopia\Pools\Group $pools */
$pools = $register->get('pools');
$dbConnection = $pools
$dbAdapter = $pools
->get('console')
->pop();
->pop()
->getResource()
;
$dbAdapter = $dbConnection->getResource();
$database = new Database($dbAdapter, getCache());
[$cache, $reclaimCache] = getCache();
$database
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
$database = new Database($dbAdapter, $cache);
$database->setNamespace('_console');
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
$reclaimCache();
}];
return $database;
}
}
// Allows overriding
if (!function_exists('getProjectDB')) {
/**
* @param Document $project
* @return array{Database, callable}
* @throws Exception
*/
function getProjectDB(Document $project): array
if (!function_exists("getProjectDB")) {
function getProjectDB(Document $project): Database
{
global $register;
/** @var Group $pools */
/** @var \Utopia\Pools\Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
$dbConnection = $pools
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop();
->pop()
->getResource()
;
$dbAdapter = $dbConnection->getResource();
$database = new Database($dbAdapter, getCache());
[$cache, $reclaimCache] = getCache();
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
$database = new Database($dbAdapter, $cache);
$database->setNamespace('_' . $project->getInternalId());
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
$reclaimCache();
}];
return $database;
}
}
// Allows overriding
if (!function_exists('getCache')) {
/**
* @return array{Cache, callable}
* @throws Exception|\Exception
*/
function getCache(): array
if (!function_exists("getCache")) {
function getCache(): Cache
{
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
$list = Config::getParam('pools-cache', []);
$connections = [];
$adapters = [];
foreach ($list as $value) {
$connection = $pools
$adapters[] = $pools
->get($value)
->pop();
$connections[] = $connection;
$adapters[] = $connection->getResource();
->pop()
->getResource()
;
}
$cache = new Cache(new Sharding($adapters));
return [$cache, function () use ($connections) {
foreach ($connections as $connection) {
$connection->reclaim();
}
}];
}
}
if (!function_exists('getPubSub')) {
/**
* @return array{Redis, callable}
* @throws Exception|\Exception
*/
function getPubSub(): array
{
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$connection = $pools
->get('pubsub')
->pop();
$redis = $connection->getResource();
return [$redis, function () use ($connection) {
$connection->reclaim();
}];
return new Cache(new Sharding($adapters));
}
}
@ -186,17 +133,13 @@ $statsDocument = null;
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
$server = new Server($adapter);
function logError(Throwable $error, string $action): void
{
global $register;
$logError = function (Throwable $error, string $action) use ($register) {
$logger = $register->get('logger');
if ($logger && !$error instanceof Exception) {
@ -230,13 +173,11 @@ function logError(Throwable $error, string $action): void
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
}
};
$server->error(function (Throwable $th, string $method) {
logError($th, $method);
});
$server->error($logError);
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument) {
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
@ -245,17 +186,11 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
*/
go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0;
$database = getConsoleDB();
do {
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$attempts++;
$document = new Document([
'$id' => ID::unique(),
'$collection' => ID::custom('realtime'),
@ -265,131 +200,102 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
'value' => '{}'
]);
$statsDocument = Authorization::skip(function () use ($database, $document) {
return $database->createDocument('realtime', $document);
});
$statsDocument = Authorization::skip(fn () => $database->createDocument('realtime', $document));
break;
} catch (Throwable) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
if (isset($reclaim)) {
$reclaim();
}
$register->get('pools')->reclaim();
});
/**
* Save current connections to the Database every 5 seconds.
*/
Timer::tick(5000, function () use ($register, $stats, &$statsDocument) {
$payload = [];
// Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
// $payload = [];
// foreach ($stats as $projectId => $value) {
// $payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
// }
// if (empty($payload) || empty($statsDocument)) {
// return;
// }
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
}
// try {
// $database = getConsoleDB();
if (empty($payload) || empty($statsDocument)) {
return;
}
// $statsDocument
// ->setAttribute('timestamp', DateTime::now())
// ->setAttribute('value', json_encode($payload));
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$statsDocument
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
Authorization::skip(function () use ($database, $statsDocument) {
$database->updateDocument('realtime', $statsDocument->getId(), $statsDocument);
});
} catch (Throwable $th) {
logError($th, 'updateWorkerDocument');
} finally {
if (isset($reclaim)) {
$reclaim();
}
}
});
// Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
// } catch (Throwable $th) {
// call_user_func($logError, $th, "updateWorkerDocument");
// } finally {
// $register->get('pools')->reclaim();
// }
// });
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, &$redisConnections) {
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
$start = time();
Timer::tick(5000, function () use ($server, $register, $realtime, $stats) {
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
try {
/**
* @var Database $database
* @var callable $reclaim
*/
[$database, $reclaim] = getConsoleDB();
$database = getConsoleDB();
$payload = [];
$payload = [];
$list = Authorization::skip(function () use ($database) {
return $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]);
});
$list = Authorization::skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
} catch (Throwable $th) {
logError($th, 'sendStats');
} finally {
if (isset($reclaim)) {
$reclaim();
}
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
$register->get('pools')->reclaim();
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
@ -421,17 +327,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
/**
* @var Redis $redis
* @var callable $reclaimForRedis
*/
[$redis, $reclaimForRedis] = getPubSub();
$redisConnections[$workerId] = [$redis, $reclaimForRedis];
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -450,36 +348,17 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = getConsoleDB();
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
$database = getProjectDB($project);
/**
* @var Database $dbForConsole
* @var Database $dbForProject
* @var callable $reclaimForConsole
* @var callable $reclaimForProject
*/
[$dbForConsole, $reclaimForConsole] = getConsoleDB();
$project = Authorization::skip(function () use ($dbForConsole, $projectId) {
return $dbForConsole->getDocument('projects', $projectId);
});
[$dbForProject, $reclaimForProject] = getProjectDB($project);
$user = $dbForProject->getDocument('users', $userId);
$user = $database->getDocument('users', $userId);
$roles = Auth::getRoles($user);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
/**
* If we successfully reclaim, clear the callbacks
* so the finally block doesn't try to reclaim again.
*/
$reclaimForConsole();
$reclaimForConsole = null;
$reclaimForProject();
$reclaimForProject = null;
$register->get('pools')->reclaim();
}
}
@ -504,54 +383,30 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
});
} catch (Throwable $th) {
logError($th, 'pubSubConnection');
call_user_func($logError, $th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
if (isset($reclaimForConsole)) {
$reclaimForConsole();
}
if (isset($reclaimForProject)) {
$reclaimForProject();
}
$register->get('pools')->reclaim();
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onWorkerStop(function (int $workerId) use ($redisConnections) {
/**
* @var Redis $redis
* @var callable $reclaim
*/
[$redis, $reclaim] = $redisConnections[$workerId] ?? null;
$redis?->unsubscribe(['realtime']);
if ($reclaim) {
$reclaim();
}
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) {
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
$app = new App('UTC');
$request = new Request($request);
$response = new Response(new SwooleResponse());
Console::info("Connection open (user: {$connection})");
App::setResource('pools', function () use ($register) {
return $register->get('pools');
});
App::setResource('request', function () use ($request) {
return $request;
});
App::setResource('response', function () use ($response) {
return $response;
});
App::setResource('pools', fn() => $register->get('pools'));
App::setResource('request', fn() => $request);
App::setResource('response', fn() => $response);
try {
/** @var Document $project */
@ -564,13 +419,9 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
}
[$dbForProject, $reclaimForProject] = getProjectDB($project);
/** @var Document $console */
$console = $app->getResource('console');
/** @var Document $user */
$user = $app->getResource('user');
$dbForProject = getProjectDB($project);
$console = $app->getResource('console'); /** @var Document $console */
$user = $app->getResource('user'); /** @var Document $user */
/*
* Abuse Check
@ -630,7 +481,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (Throwable $th) {
logError($th, 'initServer');
call_user_func($logError, $th, "initServer");
$response = [
'type' => 'error',
@ -649,9 +500,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Message: ' . $response['data']['message']);
}
} finally {
if (isset($reclaimForProject)) {
$reclaimForProject();
}
$register->get('pools')->reclaim();
}
});
@ -659,14 +508,11 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
try {
$response = new Response(new SwooleResponse());
$projectId = $realtime->connections[$connection]['projectId'];
[$database, $reclaimForConsole] = getConsoleDB();
$database = getConsoleDB();
if ($projectId !== 'console') {
$project = Authorization::skip(function () use ($database, $projectId) {
return $database->getDocument('projects', $projectId);
});
[$database, $reclaimForProject] = getProjectDB($project);
$project = Authorization::skip(fn() => $database->getDocument('projects', $projectId));
$database = getProjectDB($project);
} else {
$project = null;
}
@ -752,12 +598,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$server->close($connection, $th->getCode());
}
} finally {
if (isset($reclaimForConsole)) {
$reclaimForConsole();
}
if (isset($reclaimForProject)) {
$reclaimForProject();
}
$register->get('pools')->reclaim();
}
});
@ -765,7 +606,6 @@ $server->onClose(function (int $connection) use ($realtime, $stats) {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
}
$realtime->unsubscribe($connection);
Console::info('Connection close: ' . $connection);

View file

@ -1,52 +0,0 @@
<?php
namespace Appwrite\Utopia\Pools;
use Utopia\Pools\Connection;
class Connections
{
/**
* @var array<Connection>
*/
protected array $connections = [];
/**
* @param Connection $connection
* @return self
*/
public function add(Connection $connection): self
{
$this->connections[$connection->getID()] = $connection;
return $this;
}
/**
* @param string $id
* @return self
*/
public function remove(string $id): self
{
unset($this->connections[$id]);
return $this;
}
public function count(): int
{
return \count($this->connections);
}
/**
* @return self
* @throws \Exception
*/
public function reclaim(): self
{
foreach ($this->connections as $id => $connection) {
$connection->reclaim();
unset($this->connections[$id]);
}
return $this;
}
}

View file

@ -1,40 +0,0 @@
<?php
namespace Tests\Unit\Utopia\Pools;
use Appwrite\Utopia\Pools\Connections;
use PHPUnit\Framework\TestCase;
use Utopia\Pools\Connection;
use Utopia\Pools\Pool;
class ConnectionsTest extends TestCase
{
public function testAdd()
{
$connections = new Connections();
$connection = new Connection('resource');
$connections->add($connection);
$this->assertEquals(1, $connections->count());
}
public function testRemove()
{
$connections = new Connections();
$connection = new Connection('resource');
$connections->add($connection);
$connections->remove($connection->getID());
$this->assertEquals(0, $connections->count());
}
public function testReclaim()
{
$connections = new Connections();
$pool = new Pool('test', 1, function () {
return 'resource';
});
$connection = $pool->pop();
$connections->add($connection);
$connections->reclaim();
$this->assertEquals(1, $pool->count());
}
}