1
0
Fork 0
mirror of synced 2024-10-05 20:53:27 +13:00

Merge pull request #7941 from appwrite/feat-reclaim-only-current-connection

Reclaim only current connection
This commit is contained in:
Jake Barnby 2024-04-10 17:08:53 +12:00 committed by GitHub
commit d6be999c12
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 154 additions and 99 deletions

View file

@ -2,6 +2,7 @@
require_once __DIR__ . '/../vendor/autoload.php';
use Appwrite\Utopia\Pools\Connections;
use Appwrite\Utopia\Response;
use Swoole\Process;
use Swoole\Http\Server;
@ -323,27 +324,13 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
$swooleResponse->end(\json_encode($output));
} finally {
$connectionForConsole = $app->getResource('connectionForConsole');
$connectionForProject = $app->getResource('connectionForProject');
$connectionForQueue = $app->getResource('connectionForQueue');
$connectionsForCache = $app->getResource('connectionsForCache');
/**
* @var Connections $connections
*/
$connections = $app->getResource('connections');
if (!is_null($connectionForConsole)) {
$connectionForConsole->reclaim();
}
if (!is_null($connectionForProject)) {
$connectionForProject->reclaim();
}
if (!is_null($connectionForQueue)) {
$connectionForQueue->reclaim();
}
if (!empty($connectionsForCache)) {
foreach ($connectionsForCache as $connection) {
$connection->reclaim();
}
if (!empty($connections)) {
$connections->reclaim();
}
}
});

View file

@ -33,6 +33,7 @@ use Appwrite\Network\Validator\Email;
use Appwrite\Network\Validator\Origin;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\URL\URL as AppwriteURL;
use Appwrite\Utopia\Pools\Connections;
use Utopia\App;
use Utopia\Logger\Logger;
use Utopia\Cache\Adapter\Redis as RedisCache;
@ -881,17 +882,14 @@ App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en')))
App::setResource('localeCodes', function () {
return array_map(fn($locale) => $locale['code'], Config::getParam('locale-codes', []));
});
// Queues
App::setResource('queue', function (Group $pools) {
App::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
App::setResource('connectionForQueue', function () use ($connection) {
return $connection;
});
$connections->add($connection);
return $connection->getResource();
}, ['pools']);
}, ['pools', 'connections']);
App::setResource('queueForMessaging', function (Connection $queue) {
return new Phone($queue);
}, ['queue']);
@ -1132,20 +1130,11 @@ App::setResource('console', function () {
]);
}, []);
App::setResource('connectionForProject', function () {
return null;
});
App::setResource('connectionForConsole', function () {
return null;
});
App::setResource('connectionForQueue', function () {
return null;
});
App::setResource('connectionsForCache', function () {
return [];
App::setResource('connections', function () {
return new Connections();
});
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) {
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Connections $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -1154,9 +1143,7 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole,
->get($project->getAttribute('database'))
->pop();
App::setResource('connectionForProject', function () use ($connection) {
return $connection;
});
$connections->add($connection);
$dbAdapter = $connection->getResource();
@ -1169,16 +1156,14 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole,
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'dbForConsole', 'cache', 'project']);
}, ['pools', 'dbForConsole', 'cache', 'project', 'connections']);
App::setResource('dbForConsole', function (Group $pools, Cache $cache) {
App::setResource('dbForConsole', function (Group $pools, Cache $cache, Connections $connections) {
$connection = $pools
->get('console')
->pop();
App::setResource('connectionForConsole', function () use ($connection) {
return $connection;
});
$connections->add($connection);
$dbAdapter = $connection->getResource();
@ -1191,12 +1176,12 @@ App::setResource('dbForConsole', function (Group $pools, Cache $cache) {
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
}, ['pools', 'cache']);
}, ['pools', 'cache', 'connections']);
App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -1208,8 +1193,6 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId())
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
@ -1219,46 +1202,38 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
->get($databaseName)
->pop();
$dbAdapter = $connection->getResource();
$connections->add($connection);
App::setResource('connectionForProject', function () use ($connection) {
return $connection;
}, []);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId())
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
return $database;
};
return $getProjectDB;
}, ['pools', 'dbForConsole', 'cache']);
}, ['pools', 'dbForConsole', 'cache', 'connections']);
App::setResource('cache', function (Group $pools) {
App::setResource('cache', function (Group $pools, Connections $connections) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
$connections = [];
foreach ($list as $value) {
$connection = $pools
->get($value)
->pop();
$connections[] = $connection;
$connections->add($connection);
$adapters[] = $connection->getResource();
}
App::setResource('connectionsForCache', function () use ($connections) {
return $connections;
}, []);
return new Cache(new Sharding($adapters));
}, ['pools']);
}, ['pools', 'connections']);
App::setResource('deviceLocal', function () {
return new Local();

View file

@ -38,6 +38,8 @@ require_once __DIR__ . '/init.php';
Runtime::enableCoroutine();
$redisConnections = [];
// Allows overriding
if (!function_exists('getConsoleDB')) {
/**
@ -60,11 +62,7 @@ if (!function_exists('getConsoleDB')) {
[$cache, $reclaimCache] = getCache();
$database = new Database($dbAdapter, $cache);
$database
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
$database->setNamespace('_console');
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
@ -100,11 +98,7 @@ if (!function_exists('getProjectDB')) {
[$cache, $reclaimCache] = getCache();
$database = new Database($dbAdapter, $cache);
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
$database->setNamespace('_' . $project->getInternalId());
return [$database, function () use ($dbConnection, $reclaimCache) {
$dbConnection->reclaim();
@ -325,7 +319,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
});
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) {
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, &$redisConnections) {
Console::success('Worker ' . $workerId . ' started successfully');
$attempts = 0;
@ -436,6 +430,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
*/
[$redis, $reclaimForRedis] = getPubSub();
$redisConnections[$workerId] = [$redis, $reclaimForRedis];
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
@ -514,9 +510,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
if (isset($reclaimForRedis)) {
$reclaimForRedis();
}
if (isset($reclaimForConsole)) {
$reclaimForConsole();
}
@ -529,6 +522,20 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
Console::error('Failed to restart pub/sub...');
});
$server->onWorkerStop(function (int $workerId) use ($redisConnections) {
/**
* @var Redis $redis
* @var callable $reclaim
*/
[$redis, $reclaim] = $redisConnections[$workerId] ?? null;
$redis?->unsubscribe(['realtime']);
if ($reclaim) {
$reclaim();
}
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) {
$app = new App('UTC');
$request = new Request($request);

View file

@ -69,7 +69,7 @@
"utopia-php/storage": "0.18.*",
"utopia-php/swoole": "0.5.*",
"utopia-php/vcs": "0.6.*",
"utopia-php/websocket": "0.1.*",
"utopia-php/websocket": "dev-feat-worker-stop",
"matomo/device-detector": "6.1.*",
"dragonmantank/cron-expression": "3.3.2",
"phpmailer/phpmailer": "6.8.0",

30
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "e90287b00089ad8e8059e6e157f3a6c8",
"content-hash": "ce7a7f429d68836b8667e2b02440fc3f",
"packages": [
{
"name": "adhocore/jwt",
@ -2273,22 +2273,24 @@
},
{
"name": "utopia-php/websocket",
"version": "0.1.0",
"version": "dev-feat-worker-stop",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/websocket.git",
"reference": "51fcb86171400d8aa40d76c54593481fd273dab5"
"reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/websocket/zipball/51fcb86171400d8aa40d76c54593481fd273dab5",
"reference": "51fcb86171400d8aa40d76c54593481fd273dab5",
"url": "https://api.github.com/repos/utopia-php/websocket/zipball/b7adfab69d4c48a60272fa4cb3328f9b400b0ffa",
"reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa",
"shasum": ""
},
"require": {
"php": ">=8.0"
},
"require-dev": {
"laravel/pint": "^1.15",
"phpstan/phpstan": "^1.8",
"phpunit/phpunit": "^9.5.5",
"swoole/ide-helper": "4.6.6",
"textalk/websocket": "1.5.2",
@ -2305,16 +2307,6 @@
"license": [
"MIT"
],
"authors": [
{
"name": "Eldad Fux",
"email": "eldad@appwrite.io"
},
{
"name": "Torsten Dittmann",
"email": "torsten@appwrite.io"
}
],
"description": "A simple abstraction for WebSocket servers.",
"keywords": [
"framework",
@ -2325,9 +2317,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/websocket/issues",
"source": "https://github.com/utopia-php/websocket/tree/0.1.0"
"source": "https://github.com/utopia-php/websocket/tree/feat-worker-stop"
},
"time": "2021-12-20T10:50:09+00:00"
"time": "2024-04-09T04:48:45+00:00"
},
{
"name": "webmozart/assert",
@ -5145,7 +5137,9 @@
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": [],
"stability-flags": {
"utopia-php/websocket": 20
},
"prefer-stable": false,
"prefer-lowest": false,
"platform": {

View file

@ -0,0 +1,52 @@
<?php
namespace Appwrite\Utopia\Pools;
use Utopia\Pools\Connection;
class Connections
{
/**
* @var array<Connection>
*/
protected array $connections = [];
/**
* @param Connection $connection
* @return self
*/
public function add(Connection $connection): self
{
$this->connections[$connection->getID()] = $connection;
return $this;
}
/**
* @param string $id
* @return self
*/
public function remove(string $id): self
{
unset($this->connections[$id]);
return $this;
}
public function count(): int
{
return \count($this->connections);
}
/**
* @return self
* @throws \Exception
*/
public function reclaim(): self
{
foreach ($this->connections as $id => $connection) {
$connection->reclaim();
unset($this->connections[$id]);
}
return $this;
}
}

View file

@ -0,0 +1,40 @@
<?php
namespace Tests\Unit\Utopia\Pools;
use Appwrite\Utopia\Pools\Connections;
use PHPUnit\Framework\TestCase;
use Utopia\Pools\Connection;
use Utopia\Pools\Pool;
class ConnectionsTest extends TestCase
{
public function testAdd()
{
$connections = new Connections();
$connection = new Connection('resource');
$connections->add($connection);
$this->assertEquals(1, $connections->count());
}
public function testRemove()
{
$connections = new Connections();
$connection = new Connection('resource');
$connections->add($connection);
$connections->remove($connection->getID());
$this->assertEquals(0, $connections->count());
}
public function testReclaim()
{
$connections = new Connections();
$pool = new Pool('test', 1, function () {
return 'resource';
});
$connection = $pool->pop();
$connections->add($connection);
$connections->reclaim();
$this->assertEquals(1, $pool->count());
}
}