From 28eed022f6475941459e8a4d3a41693cf52e74b8 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Fri, 26 Feb 2021 10:21:07 +0100 Subject: [PATCH] add redis connection pool --- app/realtime.php | 49 +++++++++++++++---------- src/Appwrite/Database/Adapter/Redis.php | 12 +++++- src/Appwrite/Realtime/Realtime.php | 2 + 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index c3eae789f8..2f2cbc82b3 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -21,16 +21,16 @@ use Utopia\Config\Config; use Utopia\Registry\Registry; use Utopia\Swoole\Request as SwooleRequest; use PDO as PDONative; +use Swoole\Database\RedisConfig; +use Swoole\Database\RedisPool; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit; /** * TODO List * - * - Abuse Control / x mesages per connection * - CORS Validation * - Limit payload size - * - Message structure: { status: "ok"|"error", event: EVENT_NAME, data: } * - JWT Authentication (in path / or in message) * * Protocols Support: @@ -43,10 +43,13 @@ use Utopia\Abuse\Adapters\TimeLimit; ini_set('default_socket_timeout', -1); Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); -$server = new Server("0.0.0.0", 80); +$server = new Server('0.0.0.0', 80); + $server->set([ - 'worker_num' => 1 + 'websocket_compression' => true, + 'package_max_length' => 81920 ]); + $subscriptions = []; $connections = []; @@ -71,27 +74,33 @@ $register->set('db', function () { // Register DB connection return $pdo; }); -$register->set('cache', function () { // Register cache connection - $redis = new Redis(); - $redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); +$register->set('cache', function () use (&$pool) { // Register cache connection + $config = new RedisConfig(); + $user = App::getEnv('_APP_REDIS_USER', ''); $pass = App::getEnv('_APP_REDIS_PASS', ''); - $auth = []; + $auth = ''; + if (!empty($user)) { - $auth["user"] = $user; + $auth = $user; } if (!empty($pass)) { - $auth["pass"] = $pass; + $auth += ':' . $pass; } if (!empty($auth)) { - $redis->auth($auth); + $config->withAuth($auth); } - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + + $config + ->withHost(App::getEnv('_APP_REDIS_HOST', '')) + ->withPort(App::getEnv('_APP_REDIS_PORT', '')); + + $pool = new RedisPool($config); - return $redis; + return $pool; }); -$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections, &$register) { +$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) { Console::success('Worker ' . ++$workerId . ' started succefully'); $attempts = 0; @@ -102,10 +111,10 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, & if ($attempts > 0) { Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . '). Attempting restart in 5 seconds (attempt #' . $attempts . ')'); - sleep(5); // 1 sec delay between connection attempts + sleep(5); // 5 sec delay between connection attempts } - $redis = $register->get('cache'); + $redis = $register->get('cache')->get(); if ($redis->ping(true)) { $attempts = 0; @@ -157,7 +166,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, & Console::error('Failed to restart pub/sub...'); }); -$server->on("start", function (Server $server) { +$server->on('start', function (Server $server) { Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); @@ -182,7 +191,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio App::setResource('consoleDB', function () use (&$register) { $consoleDB = new Database(); - $consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis + $consoleDB->setAdapter(new RedisAdapter(new MySQLAdapter($register), $register, true)); // TODO: Add Redis $consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects $consoleDB->setMocks(Config::getParam('collections', [])); @@ -248,8 +257,8 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio $timeLimit = new TimeLimit('url:{url},ip:{ip}', 60, 60, function () use ($register) { return $register->get('db'); }); - $timeLimit->setNamespace('app_' . $project->getId()); $timeLimit + ->setNamespace('app_' . $project->getId()) ->setParam('{ip}', $request->getIP()) ->setParam('{url}', $request->getURI()); @@ -355,4 +364,4 @@ $server->on('close', function (Server $server, int $fd) use (&$connections, &$su Console::info('Connection close: ' . $fd); }); -$server->start(); \ No newline at end of file +$server->start(); diff --git a/src/Appwrite/Database/Adapter/Redis.php b/src/Appwrite/Database/Adapter/Redis.php index a1e440112d..47ad309569 100644 --- a/src/Appwrite/Database/Adapter/Redis.php +++ b/src/Appwrite/Database/Adapter/Redis.php @@ -19,16 +19,22 @@ class Redis extends Adapter */ protected $adapter; + /** + * @var bool + */ + protected $isPool; + /** * Redis constructor. * * @param Adapter $adapter * @param Registry $register */ - public function __construct(Adapter $adapter, Registry $register) + public function __construct(Adapter $adapter, Registry $register, $isPool = false) { $this->register = $register; $this->adapter = $adapter; + $this->isPool = $isPool; } /** @@ -261,7 +267,9 @@ class Redis extends Adapter */ protected function getRedis(): Client { - return $this->register->get('cache'); + return $this->isPool ? + $this->register->get('cache')->get() : + $this->register->get('cache'); } /** diff --git a/src/Appwrite/Realtime/Realtime.php b/src/Appwrite/Realtime/Realtime.php index bf812f00e2..5166f4bff1 100644 --- a/src/Appwrite/Realtime/Realtime.php +++ b/src/Appwrite/Realtime/Realtime.php @@ -60,6 +60,8 @@ class Realtime } /** + * Identifies the receivers of all subscriptions, based on the permissions and event. + * * @param array $event * @param array $connections * @param array $subscriptions