diff --git a/app/cli.php b/app/cli.php index edab558c9..1d0a41128 100644 --- a/app/cli.php +++ b/app/cli.php @@ -67,7 +67,9 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $databaseName = $project->getAttribute('database'); if (isset($databases[$databaseName])) { - return $databases[$databaseName]; + $database = $databases[$databaseName]; + $database->setNamespace('_' . $project->getInternalId()); + return $database; } $dbAdapter = $pools @@ -76,10 +78,11 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, ->getResource(); $database = new Database($dbAdapter, $cache); - $database->setNamespace('_' . $project->getInternalId()); $databases[$databaseName] = $database; + $database->setNamespace('_' . $project->getInternalId()); + return $database; }; @@ -158,7 +161,7 @@ $cli = $platform->getCli(); $cli ->error() ->inject('error') - ->action(function(Throwable $error) { + ->action(function (Throwable $error) { Console::error($error->getMessage()); }); @@ -166,7 +169,7 @@ $cli ->init() ->inject('pools') ->inject('cache') - ->action(function(Group $pools, Cache $cache) { + ->action(function (Group $pools, Cache $cache) { $maxAttempts = 5; $sleep = 3; @@ -189,7 +192,7 @@ $cli $collections = Config::getParam('collections', []); $last = \array_key_last($collections); - if($dbForConsole->exists($dbForConsole->getDefaultDatabase(), $last)) { + if ($dbForConsole->exists($dbForConsole->getDefaultDatabase(), $last)) { $ready = true; break; } @@ -197,7 +200,7 @@ $cli sleep($sleep); } while ($attempts < $maxAttempts); - if(!$ready) { + if (!$ready) { throw new Exception("Console is not ready yet. Please try again later."); } }); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index f65e65ba2..4ffa6771f 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -8,6 +8,8 @@ use Utopia\App; use Utopia\Config\Config; use Utopia\Database\Document; use Utopia\Pools\Group; +use Utopia\Queue\Client; +use Utopia\Queue\Connection; use Utopia\Registry\Registry; use Utopia\Storage\Device; use Utopia\Storage\Device\Local; @@ -396,10 +398,11 @@ App::get('/v1/health/queue/functions') ->label('sdk.response.code', Response::STATUS_CODE_OK) ->label('sdk.response.type', Response::CONTENT_TYPE_JSON) ->label('sdk.response.model', Response::MODEL_HEALTH_QUEUE) + ->inject('queueConnection') ->inject('response') - ->action(function (Response $response) { - - $response->dynamic(new Document([ 'size' => Resque::size(Event::FUNCTIONS_QUEUE_NAME) ]), Response::MODEL_HEALTH_QUEUE); + ->action(function (Connection $queueConnection, Response $response) { + $client = new Client(Event::FUNCTIONS_QUEUE_NAME, $queueConnection); + $response->dynamic(new Document([ 'size' => $client->sumProcessingJobs() ]), Response::MODEL_HEALTH_QUEUE); }, ['response']); App::get('/v1/health/storage/local') diff --git a/app/init.php b/app/init.php index 31f82740b..172fd845c 100644 --- a/app/init.php +++ b/app/init.php @@ -76,6 +76,7 @@ use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; use Swoole\Database\PDOProxy; use Utopia\Queue; +use Utopia\Queue\Connection; const APP_NAME = 'Appwrite'; const APP_DOMAIN = 'appwrite.io'; @@ -852,9 +853,12 @@ App::setResource('mails', fn() => new Mail()); App::setResource('deletes', fn() => new Delete()); App::setResource('database', fn() => new EventDatabase()); App::setResource('messaging', fn() => new Phone()); -App::setResource('queueForFunctions', function (Group $pools) { - return new Func($pools->get('queue')->pop()->getResource()); +App::setResource('queueConnection', function (Group $pools) { + return $pools->get('queue')->pop()->getResource(); }, ['pools']); +App::setResource('queueForFunctions', function (Connection $queueConnection) { + return new Func($queueConnection); +}, ['queueConnection']); App::setResource('usage', function ($register) { return new Stats($register->get('statsd')); }, ['register']); diff --git a/app/worker.php b/app/worker.php index 42a5f9243..8ebd1b542 100644 --- a/app/worker.php +++ b/app/worker.php @@ -17,6 +17,7 @@ use Utopia\Queue\Server; use Utopia\Registry\Registry; use Utopia\Logger\Log; use Utopia\Logger\Logger; +use Utopia\Pools\Group; Runtime::enableCoroutine(SWOOLE_HOOK_ALL); @@ -55,7 +56,6 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, $adapter = new Database($database, $cache); $adapter->setNamespace('_' . $project->getInternalId()); - return $adapter; }, ['cache', 'register', 'message', 'dbForConsole']); @@ -93,9 +93,14 @@ Server::setResource('statsd', function ($register) { return $register->get('statsd'); }, ['register']); +Server::setResource('pools', function ($register) { + return $register->get('pools'); +}, ['register']); + $pools = $register->get('pools'); $connection = $pools->get('queue')->pop()->getResource(); $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); +$workerNumber = 1; if (empty(App::getEnv('QUEUE'))) { throw new Exception('Please configure "QUEUE" environemnt variable.'); @@ -104,6 +109,13 @@ if (empty(App::getEnv('QUEUE'))) { $adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE')); $server = new Server($adapter); +$server + ->shutdown() + ->inject('pools') + ->action(function (Group $pools) { + $pools->reclaim(); + }); + $server ->error() ->inject('error') diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 3e0df8ce3..bfbb55008 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -276,30 +276,29 @@ class DeletesV1 extends Worker { $userId = $document->getId(); + $dbForProject = $this->getProjectDB($project); + // Delete all sessions of this user from the sessions table and update the sessions field of the user record $this->deleteByGroup('sessions', [ Query::equal('userId', [$userId]) - ], $this->getProjectDB($project)); + ], $dbForProject); - $this->getProjectDB($project)->deleteCachedDocument('users', $userId); + $dbForProject->deleteCachedDocument('users', $userId); // Delete Memberships and decrement team membership counts $this->deleteByGroup('memberships', [ Query::equal('userId', [$userId]) - ], $this->getProjectDB($project), function (Document $document) use ($project) { - + ], $dbForProject, function (Document $document) use ($dbForProject) { if ($document->getAttribute('confirm')) { // Count only confirmed members $teamId = $document->getAttribute('teamId'); - $team = $this->getProjectDB($project)->getDocument('teams', $teamId); + $team = $dbForProject->getDocument('teams', $teamId); if (!$team->isEmpty()) { - $team = $this - ->getProjectDB($project) - ->updateDocument( - 'teams', - $teamId, - // Ensure that total >= 0 + $team = $dbForProject->updateDocument( + 'teams', + $teamId, + // Ensure that total >= 0 $team->setAttribute('total', \max($team->getAttribute('total', 0) - 1, 0)) - ); + ); } } }); @@ -307,7 +306,7 @@ class DeletesV1 extends Worker // Delete tokens $this->deleteByGroup('tokens', [ Query::equal('userId', [$userId]) - ], $this->getProjectDB($project)); + ], $dbForProject); } /** diff --git a/composer.json b/composer.json index ffd9c6b6b..719596719 100644 --- a/composer.json +++ b/composer.json @@ -53,7 +53,7 @@ "utopia-php/domains": "1.1.*", "utopia-php/framework": "0.25.*", "utopia-php/image": "0.5.*", - "utopia-php/queue": "0.4.*", + "utopia-php/queue": "0.5.*", "utopia-php/locale": "0.4.*", "utopia-php/logger": "0.3.*", "utopia-php/orchestration": "0.9.*", diff --git a/composer.lock b/composer.lock index 517065af5..04dc73195 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "a673091aa6bd8ef01380b63245427c93", + "content-hash": "56cd96524e05cf30210256539cd7cb6a", "packages": [ { "name": "adhocore/jwt", @@ -2359,16 +2359,16 @@ }, { "name": "utopia-php/queue", - "version": "0.4.1", + "version": "dev-feat-connection-ping", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e" + "reference": "42e47dc8b40b2040ea9b70c644799d7d7a097520" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e", - "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/42e47dc8b40b2040ea9b70c644799d7d7a097520", + "reference": "42e47dc8b40b2040ea9b70c644799d7d7a097520", "shasum": "" }, "require": { @@ -2414,9 +2414,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/0.4.1" + "source": "https://github.com/utopia-php/queue/tree/feat-connection-ping" }, - "time": "2022-11-15T16:56:37+00:00" + "time": "2022-11-16T17:12:06+00:00" }, { "name": "utopia-php/registry", @@ -5276,9 +5276,18 @@ "time": "2022-09-28T08:42:51+00:00" } ], - "aliases": [], + "aliases": [ + { + "package": "utopia-php/queue", + "version": "dev-feat-connection-ping", + "alias": "0.4.0", + "alias_normalized": "0.4.0.0" + } + ], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": { + "utopia-php/queue": 20 + }, "prefer-stable": false, "prefer-lowest": false, "platform": { diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 6e3401e11..adb5772f9 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -116,9 +116,9 @@ class Event /** * Get project for this event. * - * @return Document + * @return ?Document */ - public function getProject(): Document + public function getProject(): ?Document { return $this->project; } @@ -139,9 +139,9 @@ class Event /** * Get project for this event. * - * @return Document + * @return ?Document */ - public function getUser(): Document + public function getUser(): ?Document { return $this->user; } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 22940ad08..5f8b4c80c 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -2,9 +2,6 @@ namespace Appwrite\Event; -use DateTime; -use Resque; -use ResqueScheduler; use Utopia\Database\Document; use Utopia\Queue\Client; use Utopia\Queue\Connection; diff --git a/src/Appwrite/Resque/Worker.php b/src/Appwrite/Resque/Worker.php index 5d05d7757..bdb1ccb8d 100644 --- a/src/Appwrite/Resque/Worker.php +++ b/src/Appwrite/Resque/Worker.php @@ -169,6 +169,7 @@ abstract class Worker * @param Document $project * @return Database */ + protected static $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools protected function getProjectDB(Document $project): Database { global $register; @@ -179,6 +180,14 @@ abstract class Worker return $this->getConsoleDB(); } + $databaseName = $project->getAttribute('database'); + + if (isset(self::$databases[$databaseName])) { + $database = self::$databases[$databaseName]; + $database->setNamespace('_' . $project->getInternalId()); + return $database; + } + $dbAdapter = $pools ->get($project->getAttribute('database')) ->pop() @@ -186,6 +195,9 @@ abstract class Worker ; $database = new Database($dbAdapter, $this->getCache()); + + self::$databases[$databaseName] = $database; + $database->setNamespace('_' . $project->getInternalId()); return $database; diff --git a/tests/e2e/Services/Teams/TeamsBaseServer.php b/tests/e2e/Services/Teams/TeamsBaseServer.php index df508e5e3..c4d92ff0d 100644 --- a/tests/e2e/Services/Teams/TeamsBaseServer.php +++ b/tests/e2e/Services/Teams/TeamsBaseServer.php @@ -246,7 +246,6 @@ trait TeamsBaseServer 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders())); - $this->assertEquals(200, $response['headers']['status-code']); $this->assertNotEmpty($response['body']['$id']); $this->assertEquals('Arsenal', $response['body']['name']);