1
0
Fork 0
mirror of synced 2024-09-21 20:11:15 +12:00

Fix connections pool

This commit is contained in:
Matej Bačo 2024-03-09 11:59:45 +01:00
parent 4de95913a5
commit 2a0a69f3ed
14 changed files with 231 additions and 140 deletions

View file

@ -8,6 +8,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Func;
use Appwrite\Event\Hamster;
use Appwrite\Platform\Appwrite;
use Appwrite\Utopia\Queue\Connections;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\CLI;
@ -27,39 +28,41 @@ global $register;
CLI::setResource('register', fn () => $register);
CLI::setResource('cache', function ($pools) {
CLI::setResource('connections', function () {
return new Connections();
});
CLI::setResource('cache', function ($pools, Connections $connections) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
$connection = $pools->get($value)->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
}
return new Cache(new Sharding($adapters));
}, ['pools']);
}, ['pools', 'connections']);
CLI::setResource('pools', function (Registry $register) {
return $register->get('pools');
}, ['register']);
CLI::setResource('dbForConsole', function ($pools, $cache, $auth) {
CLI::setResource('dbForConsole', function ($pools, $cache, $auth, Connections $connections) {
$sleep = 3;
$maxAttempts = 5;
$attempts = 0;
$ready = false;
$connection = null;
do {
$attempts++;
try {
// Prepare database connection
$dbAdapter = $pools
->get('console')
->pop()
->getResource();
$connection = $pools->get('console')->pop();
$dbAdapter = $connection->getResource();
$dbForConsole = new Database($dbAdapter, $cache);
$dbForConsole->setAuthorization($auth);
@ -79,23 +82,31 @@ CLI::setResource('dbForConsole', function ($pools, $cache, $auth) {
$ready = true;
} catch (\Throwable $err) {
if($connection !== null) {
$connection->reclaim();
$connection = null;
}
Console::warning($err->getMessage());
$pools->get('console')->reclaim();
sleep($sleep);
}
} while ($attempts < $maxAttempts && !$ready);
if($connection !== null) {
$connections->add($connection);
}
if (!$ready) {
throw new Exception("Console is not ready yet. Please try again later.");
}
return $dbForConsole;
}, ['pools', 'cache', 'auth']);
}, ['pools', 'cache', 'auth', 'connections']);
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, $auth) {
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, $auth, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth) {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -108,10 +119,9 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
return $database;
}
$dbAdapter = $pools
->get($databaseName)
->pop()
->getResource();
$connection = $pools->get($databaseName)->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database->setAuthorization($auth);
@ -125,11 +135,13 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
return $database;
};
}, ['pools', 'dbForConsole', 'cache', 'auth']);
}, ['pools', 'dbForConsole', 'cache', 'auth', 'connections']);
CLI::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
CLI::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
}, ['pools', 'connections']);
CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);

View file

@ -3,6 +3,7 @@
use Appwrite\ClamAV\Network;
use Appwrite\Event\Event;
use Appwrite\Extend\Exception;
use Appwrite\Utopia\Queue\Connections;
use Appwrite\Utopia\Response;
use Utopia\Config\Config;
use Utopia\Database\Document;
@ -69,7 +70,8 @@ Http::get('/v1/health/db')
->label('sdk.response.model', Response::MODEL_HEALTH_STATUS)
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
->inject('connections')
->action(function (Response $response, Group $pools, Connections $connections) {
$output = [];
@ -81,7 +83,9 @@ Http::get('/v1/health/db')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$connection = $pools->get($database)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$checkStart = \microtime(true);
@ -123,7 +127,8 @@ Http::get('/v1/health/cache')
->label('sdk.response.model', Response::MODEL_HEALTH_STATUS)
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
->inject('connections')
->action(function (Response $response, Group $pools, Connections $connections) {
$output = [];
@ -134,7 +139,9 @@ Http::get('/v1/health/cache')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$connection = $pools->get($database)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$checkStart = \microtime(true);
@ -180,7 +187,8 @@ Http::get('/v1/health/queue')
->label('sdk.response.model', Response::MODEL_HEALTH_STATUS)
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
->inject('connections')
->action(function (Response $response, Group $pools, Connections $connections) {
$output = [];
@ -191,7 +199,9 @@ Http::get('/v1/health/queue')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$connection = $pools->get($database)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$checkStart = \microtime(true);
@ -237,7 +247,8 @@ Http::get('/v1/health/pubsub')
->label('sdk.response.model', Response::MODEL_HEALTH_STATUS)
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
->inject('connections')
->action(function (Response $response, Group $pools, Connections $connections) {
$output = [];
@ -248,7 +259,9 @@ Http::get('/v1/health/pubsub')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$connection = $pools->get($database)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$checkStart = \microtime(true);

View file

@ -10,6 +10,7 @@ use Appwrite\Network\Validator\Origin;
use Appwrite\Template\Template;
use Appwrite\Utopia\Database\Validator\ProjectId;
use Appwrite\Utopia\Database\Validator\Queries\Projects;
use Appwrite\Utopia\Queue\Connections;
use Appwrite\Utopia\Response;
use PHPMailer\PHPMailer\PHPMailer;
use Utopia\Abuse\Adapters\TimeLimit;
@ -78,7 +79,8 @@ Http::post('/v1/projects')
->inject('cache')
->inject('pools')
->inject('auth')
->action(function (string $projectId, string $name, string $teamId, string $region, string $description, string $logo, string $url, string $legalName, string $legalCountry, string $legalState, string $legalCity, string $legalAddress, string $legalTaxId, Response $response, Database $dbForConsole, Cache $cache, Group $pools, Authorization $auth) {
->inject('connections')
->action(function (string $projectId, string $name, string $teamId, string $region, string $description, string $logo, string $url, string $legalName, string $legalCountry, string $legalState, string $legalCity, string $legalAddress, string $legalTaxId, Response $response, Database $dbForConsole, Cache $cache, Group $pools, Authorization $auth, Connections $connections) {
$team = $dbForConsole->getDocument('teams', $teamId);
@ -178,7 +180,9 @@ Http::post('/v1/projects')
throw new Exception(Exception::PROJECT_ALREADY_EXISTS);
}
$dbForProject = new Database($pools->get($database)->pop()->getResource(), $cache);
$connection = $pools->get($database)->pop();
$connections->add($connection);
$dbForProject = new Database($connection->getResource(), $cache);
$dbForProject->setAuthorization($auth);
$dbForProject->setNamespace("_{$project->getInternalId()}");
$dbForProject->create();

View file

@ -12,6 +12,7 @@ use Appwrite\Event\Messaging;
use Appwrite\Event\Usage;
use Appwrite\Extend\Exception;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Queue\Connections;
use Appwrite\Utopia\Request;
use Appwrite\Utopia\Response;
use Utopia\Abuse\Abuse;
@ -28,7 +29,6 @@ use Utopia\Database\Validator\Authorization\Input;
use Utopia\Http\Http;
use Utopia\Http\Validator\WhiteList;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
$parseLabel = function (string $label, array $responsePayload, array $requestParams, Document $user) {
preg_match_all('/{(.*?)}/', $label, $matches);
@ -749,14 +749,14 @@ Http::init()
});
Http::shutdown()
->inject('pools')
->action(function (Group $pools) {
$pools->reclaim();
->inject('connections')
->action(function (Connections $connections) {
$connections->reclaim();
});
Http::error()
->inject('pools')
->action(function (Group $pools) {
$pools->reclaim();
->inject('connections')
->action(function (Connections $connections) {
$connections->reclaim();
});

View file

@ -15,8 +15,6 @@ use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Http\Adapter\Swoole\Request as SwooleRequest;
use Utopia\Http\Adapter\Swoole\Response as SwooleResponse;
use Utopia\Http\Adapter\Swoole\Server;
use Utopia\Http\Http;
use Utopia\Pools\Group;
@ -209,4 +207,3 @@ go(function () use ($register, $http, $payloadSize) {
$http->start();
});

View file

@ -40,6 +40,7 @@ use Appwrite\Network\Validator\Email;
use Appwrite\Network\Validator\Origin;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\URL\URL as AppwriteURL;
use Appwrite\Utopia\Queue\Connections;
use MaxMind\Db\Reader;
use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Database\PDOProxy;
@ -1043,10 +1044,16 @@ Http::setResource('localeCodes', function () {
return array_map(fn ($locale) => $locale['code'], Config::getParam('locale-codes', []));
});
Http::setResource('connections', function () {
return new Connections();
});
// Queues
Http::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
Http::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
}, ['pools', 'connections']);
Http::setResource('queueForMessaging', function (Connection $queue) {
return new Messaging($queue);
}, ['queue']);
@ -1307,15 +1314,14 @@ Http::setResource('console', function () {
]);
}, []);
Http::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Authorization $auth) {
Http::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Authorization $auth, Connections $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop()
->getResource();
$connection = $pools->get($project->getAttribute('database'))->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database->setAuthorization($auth);
@ -1327,14 +1333,12 @@ Http::setResource('dbForProject', function (Group $pools, Database $dbForConsole
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'dbForConsole', 'cache', 'project', 'auth']);
}, ['pools', 'dbForConsole', 'cache', 'project', 'auth', 'connections']);
Http::setResource('dbForConsole', function (Group $pools, Cache $cache, Authorization $auth) {
$dbAdapter = $pools
->get('console')
->pop()
->getResource()
;
Http::setResource('dbForConsole', function (Group $pools, Cache $cache, Authorization $auth, Connections $connections) {
$connection = $pools->get('console')->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database->setAuthorization($auth);
@ -1346,12 +1350,12 @@ Http::setResource('dbForConsole', function (Group $pools, Cache $cache, Authoriz
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'cache', 'auth']);
}, ['pools', 'cache', 'auth', 'connections']);
Http::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, Authorization $auth) {
Http::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, Authorization $auth, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth) {
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -1370,10 +1374,9 @@ Http::setResource('getProjectDB', function (Group $pools, Database $dbForConsole
return $database;
}
$dbAdapter = $pools
->get($databaseName)
->pop()
->getResource();
$connection = $pools->get($databaseName)->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database->setAuthorization($auth);
@ -1390,22 +1393,20 @@ Http::setResource('getProjectDB', function (Group $pools, Database $dbForConsole
};
return $getProjectDB;
}, ['pools', 'dbForConsole', 'cache', 'auth']);
}, ['pools', 'dbForConsole', 'cache', 'auth', 'connections']);
Http::setResource('cache', function (Group $pools) {
Http::setResource('cache', function (Group $pools, Connections $connections) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
$connection = $pools->get($value)->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
}
return new Cache(new Sharding($adapters));
}, ['pools']);
}, ['pools', 'connections']);
Http::setResource('deviceForLocal', function () {
return new Local();

View file

@ -16,6 +16,7 @@ use Appwrite\Event\Migration;
use Appwrite\Event\Usage;
use Appwrite\Event\UsageDump;
use Appwrite\Platform\Appwrite;
use Appwrite\Utopia\Queue\Connections;
use Swoole\Runtime;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
@ -42,19 +43,22 @@ Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
Server::setResource('register', fn () => $register);
Server::setResource('dbForConsole', function (Cache $cache, Registry $register, Authorization $auth) {
Server::setResource('connections', function () {
return new Connections();
});
Server::setResource('dbForConsole', function (Cache $cache, Registry $register, Authorization $auth, Connections $connections) {
$pools = $register->get('pools');
$database = $pools
->get('console')
->pop()
->getResource();
$connection = $pools->get('console')->pop();
$connections->add($connection);
$database = $connection->getResource();
$adapter = new Database($database, $cache);
$adapter->setAuthorization($auth);
$adapter->setNamespace('_console');
return $adapter;
}, ['cache', 'register', 'auth']);
}, ['cache', 'register', 'auth', 'connections']);
Server::setResource('project', function (Message $message, Database $dbForConsole) {
$payload = $message->getPayload() ?? [];
@ -67,27 +71,26 @@ Server::setResource('project', function (Message $message, Database $dbForConsol
return $dbForConsole->getDocument('projects', $project->getId());
}, ['message', 'dbForConsole']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Document $project, Database $dbForConsole, Authorization $auth) {
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Document $project, Database $dbForConsole, Authorization $auth, Connections $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$pools = $register->get('pools');
$database = $pools
->get($project->getAttribute('database'))
->pop()
->getResource();
$connection = $pools->get($project->getAttribute('database'))->pop();
$connections->add($connection);
$database = $connection->getResource();
$adapter = new Database($database, $cache);
$adapter->setAuthorization($auth);
$adapter->setNamespace('_' . $project->getInternalId());
return $adapter;
}, ['cache', 'register', 'message', 'project', 'dbForConsole', 'auth']);
}, ['cache', 'register', 'message', 'project', 'dbForConsole', 'auth', 'connections']);
Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, Authorization $auth) {
Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, Authorization $auth, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth): Database {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections): Database {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -100,10 +103,9 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso
return $database;
}
$dbAdapter = $pools
->get($databaseName)
->pop()
->getResource();
$connection = $pools->get($databaseName)->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database->setAuthorization($auth);
@ -114,7 +116,7 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso
return $database;
};
}, ['pools', 'dbForConsole', 'cache', 'auth']);
}, ['pools', 'dbForConsole', 'cache', 'auth', 'connections']);
Server::setResource('abuseRetention', function () {
return DateTime::addSeconds(new \DateTime(), -1 * Http::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400));
@ -128,21 +130,19 @@ Server::setResource('executionRetention', function () {
return DateTime::addSeconds(new \DateTime(), -1 * Http::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', 1209600));
});
Server::setResource('cache', function (Registry $register) {
Server::setResource('cache', function (Registry $register, Connections $connections) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
$connection = $pools->get($value)->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
}
return new Cache(new Sharding($adapters));
}, ['register']);
}, ['register', 'connections']);
Server::setResource('log', fn () => new Log());
@ -154,9 +154,11 @@ Server::setResource('queueForUsageDump', function (Connection $queue) {
return new UsageDump($queue);
}, ['queue']);
Server::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
Server::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
}, ['pools', 'connections']);
Server::setResource('queueForDatabase', function (Connection $queue) {
return new EventDatabase($queue);
@ -283,9 +285,9 @@ $worker
$worker
->shutdown()
->inject('pools')
->action(function (Group $pools) {
$pools->reclaim();
->inject('connections')
->action(function (Connections $connections) {
$connections->reclaim();
});
$worker
@ -293,11 +295,11 @@ $worker
->inject('error')
->inject('logger')
->inject('log')
->inject('pools')
->inject('connections')
->inject('project')
->inject('auth')
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project, Authorization $auth) use ($queueName) {
$pools->reclaim();
->action(function (Throwable $error, ?Logger $logger, Log $log, Connections $connections, Document $project, Authorization $auth) use ($queueName) {
$connections->reclaim();
$version = Http::getEnv('_APP_VERSION', 'UNKNOWN');
if ($error instanceof PDOException) {

View file

@ -2,6 +2,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Utopia\Queue\Connections;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -34,13 +35,14 @@ class DeleteOrphanedProjects extends Action
->inject('dbForConsole')
->inject('register')
->inject('auth')
->callback(function (bool $commit, Group $pools, Cache $cache, Database $dbForConsole, Registry $register, Authorization $auth) {
$this->action($commit, $pools, $cache, $dbForConsole, $register, $auth);
->inject('connections')
->callback(function (bool $commit, Group $pools, Cache $cache, Database $dbForConsole, Registry $register, Authorization $auth, Connections $connections) {
$this->action($commit, $pools, $cache, $dbForConsole, $register, $auth, $connections);
});
}
public function action(bool $commit, Group $pools, Cache $cache, Database $dbForConsole, Registry $register, Authorization $auth): void
public function action(bool $commit, Group $pools, Cache $cache, Database $dbForConsole, Registry $register, Authorization $auth, Connections $connections): void
{
Console::title('Delete orphaned projects V1');
@ -87,10 +89,9 @@ class DeleteOrphanedProjects extends Action
try {
$db = $project->getAttribute('database');
$adapter = $pools
->get($db)
->pop()
->getResource();
$connection = $pools->get($db)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$dbForProject = new Database($adapter, $cache);
$dbForProject->setAuthorization($auth);

View file

@ -3,6 +3,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\ClamAV\Network;
use Appwrite\Utopia\Queue\Connections;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Domains\Domain;
@ -25,10 +26,11 @@ class Doctor extends Action
$this
->desc('Validate server health')
->inject('register')
->callback(fn (Registry $register) => $this->action($register));
->inject('connections')
->callback(fn (Registry $register, Connections $connections) => $this->action($register, $connections));
}
public function action(Registry $register): void
public function action(Registry $register, Connections $connections): void
{
Console::log(" __ ____ ____ _ _ ____ __ ____ ____ __ __
/ _\ ( _ \( _ \/ )( \( _ \( )(_ _)( __) ( )/ \
@ -126,7 +128,9 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$connection = $pools->get($database)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected');
@ -149,7 +153,9 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $pool) {
try {
$adapter = $pools->get($pool)->pop()->getResource();
$connection = $pools->get($pool)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected');

View file

@ -2,13 +2,13 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Utopia\Queue\Connections;
use League\Csv\CannotInsertRecord;
use League\Csv\Writer;
use PHPMailer\PHPMailer\PHPMailer;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization as ValidatorAuthorization;
use Utopia\Http\Adapter\FPM\Server;
@ -50,8 +50,9 @@ class GetMigrationStats extends Action
->inject('dbForConsole')
->inject('register')
->inject('auth')
->callback(function (Group $pools, Cache $cache, Database $dbForConsole, Registry $register, ValidatorAuthorization $auth) {
$this->action($pools, $cache, $dbForConsole, $register, $auth);
->inject('connections')
->callback(function (Group $pools, Cache $cache, Database $dbForConsole, Registry $register, ValidatorAuthorization $auth, Connections $connections) {
$this->action($pools, $cache, $dbForConsole, $register, $auth, $connections);
});
}
@ -59,7 +60,7 @@ class GetMigrationStats extends Action
* @throws \Utopia\Exception
* @throws CannotInsertRecord
*/
public function action(Group $pools, Cache $cache, Database $dbForConsole, Registry $register, ValidatorAuthorization $auth): void
public function action(Group $pools, Cache $cache, Database $dbForConsole, Registry $register, ValidatorAuthorization $auth, Connections $connections): void
{
//docker compose exec -t appwrite get-migration-stats
@ -99,12 +100,11 @@ class GetMigrationStats extends Action
try {
$db = $project->getAttribute('database');
$adapter = $pools
->get($db)
->pop()
->getResource();
$connection = $pools->get($db)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$dbForProject = new Database($adapter, $cache);
$dbForProject = new Database($adapter, $cache); // TODO: Use getProjectDB instead, or reclaim connections properly
$dbForProject->setAuthorization($auth);
$dbForProject->setDatabase('appwrite');
$dbForProject->setNamespace('_' . $project->getInternalId());

View file

@ -90,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase
->trigger();
}
$queue->reclaim();
$queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
});
}

View file

@ -51,7 +51,7 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim();
$queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
unset($this->schedules[$schedule['resourceId']]);
});

View file

@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers;
use Appwrite\Event\Hamster as EventHamster;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Queue\Connections;
use Utopia\Analytics\Adapter\Mixpanel;
use Utopia\Analytics\Event as AnalyticsEvent;
use Utopia\Cache\Cache;
@ -54,7 +55,8 @@ class Hamster extends Action
->inject('cache')
->inject('dbForConsole')
->inject('auth')
->callback(fn (Message $message, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth) => $this->action($message, $pools, $cache, $dbForConsole, $auth));
->inject('connections')
->callback(fn (Message $message, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth, Connections $connections) => $this->action($message, $pools, $cache, $dbForConsole, $auth, $connections));
}
/**
@ -66,7 +68,7 @@ class Hamster extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth): void
public function action(Message $message, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth, Connections $connections): void
{
$token = Http::getEnv('_APP_MIXPANEL_TOKEN', '');
if (empty($token)) {
@ -84,7 +86,7 @@ class Hamster extends Action
switch ($type) {
case EventHamster::TYPE_PROJECT:
$this->getStatsForProject(new Document($payload['project']), $pools, $cache, $dbForConsole, $auth);
$this->getStatsForProject(new Document($payload['project']), $pools, $cache, $dbForConsole, $auth, $connections);
break;
case EventHamster::TYPE_ORGANISATION:
$this->getStatsForOrganization(new Document($payload['organization']), $dbForConsole);
@ -102,7 +104,7 @@ class Hamster extends Action
* @param Database $dbForConsole
* @throws \Utopia\Database\Exception
*/
private function getStatsForProject(Document $project, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth): void
private function getStatsForProject(Document $project, Group $pools, Cache $cache, Database $dbForConsole, Authorization $auth, Connections $connections): void
{
/**
* Skip user projects with id 'console'
@ -116,12 +118,11 @@ class Hamster extends Action
try {
$db = $project->getAttribute('database');
$adapter = $pools
->get($db)
->pop()
->getResource();
$connection = $pools->get($db)->pop();
$connections->add($connection);
$adapter = $connection->getResource();
$dbForProject = new Database($adapter, $cache);
$dbForProject = new Database($adapter, $cache); // TODO: Use getProjectDB instead, or reclaim connections properly
$dbForProject->setAuthorization($auth);
$dbForProject->setDatabase('appwrite');
$dbForProject->setNamespace('_' . $project->getInternalId());

View file

@ -0,0 +1,54 @@
<?php
namespace Appwrite\Utopia\Queue;
use Utopia\Pools\Connection;
class Connections
{
/**
* @var Connection[]
*/
protected array $connections = [];
/**
* @param Connection $pool
* @return self
*/
public function add(Connection $connection): self
{
$this->connections[$connection->getID()] = $connection;
return $this;
}
/**
* @param string $id
* @return Connection
*/
public function get(string $id): Connection
{
return $this->connections[$id] ?? throw new \Exception("Connection '{$id}' not found");
}
/**
* @param string $id
* @return self
*/
public function remove(string $id): self
{
unset($this->connections[$id]);
return $this;
}
/**
* @return self
*/
public function reclaim(): self
{
foreach ($this->connections as $connection) {
$connection->reclaim();
}
return $this;
}
}