From ba4eedb114f7b8d6220c2ea83e22fc92790536d5 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Mon, 1 Mar 2021 15:44:01 +0100 Subject: [PATCH] use redis connection pool --- app/realtime.php | 69 ++++++++++--------------- src/Appwrite/Database/Adapter/Redis.php | 11 +++- src/Appwrite/Realtime/Realtime.php | 6 +-- 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 2875359a0..3e4a0abd5 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -9,8 +9,9 @@ use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Database; use Appwrite\Database\Document; use Appwrite\Database\Validator\Authorization; -use Appwrite\Extend\PDO; use Appwrite\Realtime\Realtime; +use Swoole\Database\RedisConfig; +use Swoole\Database\RedisPool; use Swoole\WebSocket\Server; use Swoole\Http\Request; use Swoole\Process; @@ -18,9 +19,7 @@ use Swoole\WebSocket\Frame; use Utopia\App; use Utopia\CLI\Console; use Utopia\Config\Config; -use Utopia\Registry\Registry; use Utopia\Swoole\Request as SwooleRequest; -use PDO as PDONative; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit; @@ -51,45 +50,30 @@ $server->set([ $subscriptions = []; $connections = []; -$register = new Registry(); - -$register->set('db', function () { // Register DB connection - $dbHost = App::getEnv('_APP_DB_HOST', ''); - $dbUser = App::getEnv('_APP_DB_USER', ''); - $dbPass = App::getEnv('_APP_DB_PASS', ''); - $dbScheme = App::getEnv('_APP_DB_SCHEMA', ''); - - $pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array( - PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4', - PDONative::ATTR_TIMEOUT => 3, // Seconds - PDONative::ATTR_PERSISTENT => true - )); - - // Connection settings - $pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays - $pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions - - return $pdo; -}); - -$register->set('cache', function () { // Register cache connection - $redis = new Redis(); - $redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); - $user = App::getEnv('_APP_REDIS_USER', ''); - $pass = App::getEnv('_APP_REDIS_PASS', ''); - $auth = []; - if (!empty($user)) { - $auth["user"] = $user; +$register->set('redis', function () { + $user = App::getEnv('_APP_REDIS_USER',''); + $pass = App::getEnv('_APP_REDIS_PASS',''); + $auth = ''; + if(!empty($user)) { + $auth += $user; } - if (!empty($pass)) { - $auth["pass"] = $pass; + if(!empty($pass)) { + $auth += ':' . $pass; } - if (!empty($auth)) { - $redis->auth($auth); - } - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); - return $redis; + $config = new RedisConfig(); + $config + ->withHost(App::getEnv('_APP_REDIS_HOST', '')) + ->withPort(App::getEnv('_APP_REDIS_PORT', '')) + ->withAuth($auth) + ->withTimeout(0) + ->withReadTimeout(0) + ->withRetryInterval(0); + + + $pool = new RedisPool($config); + + return $pool; }); $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) { @@ -106,7 +90,8 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, & sleep(5); // 5 sec delay between connection attempts } - $redis = $register->get('cache'); + $redis = $register->get('redis')->get(); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { $attempts = 0; @@ -183,7 +168,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio App::setResource('consoleDB', function () use (&$register) { $consoleDB = new Database(); - $consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis + $consoleDB->setAdapter(new RedisAdapter(new MySQLAdapter($register), $register, true)); $consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects $consoleDB->setMocks(Config::getParam('collections', [])); @@ -280,7 +265,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio $server->close($connection); } - Realtime::subscribe($project->getId(), $connection, $subscriptions, $connections, $roles, $channels); + Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels); $server->push($connection, json_encode($channels)); }); diff --git a/src/Appwrite/Database/Adapter/Redis.php b/src/Appwrite/Database/Adapter/Redis.php index a1e440112..3c5cdb503 100644 --- a/src/Appwrite/Database/Adapter/Redis.php +++ b/src/Appwrite/Database/Adapter/Redis.php @@ -19,16 +19,23 @@ class Redis extends Adapter */ protected $adapter; + /** + * @var bool + */ + protected $isPool = false; + /** * Redis constructor. * * @param Adapter $adapter * @param Registry $register + * @param bool $isPool */ - public function __construct(Adapter $adapter, Registry $register) + public function __construct(Adapter $adapter, Registry $register, bool $isPool = false) { $this->register = $register; $this->adapter = $adapter; + $this->isPool = $isPool; } /** @@ -261,7 +268,7 @@ class Redis extends Adapter */ protected function getRedis(): Client { - return $this->register->get('cache'); + return $this->isPool ? $this->register->get('redis')->get() : $this->register->get('cache'); } /** diff --git a/src/Appwrite/Realtime/Realtime.php b/src/Appwrite/Realtime/Realtime.php index 62e2be5d5..1dc5170ed 100644 --- a/src/Appwrite/Realtime/Realtime.php +++ b/src/Appwrite/Realtime/Realtime.php @@ -79,7 +79,7 @@ class Realtime static function identifyReceivers(array &$event, array &$connections, array &$subscriptions) { $receivers = []; - foreach ($connections as $fd => $connection) { + foreach ($connections as $connection) { if ($connection['projectId'] !== $event['project']) { continue; } @@ -133,7 +133,7 @@ class Realtime $subscriptions[$projectId] = []; } - foreach ($roles as $key => $role) { + foreach ($roles as $role) { if (!isset($subscriptions[$projectId][$role])) { // Add user first connection $subscriptions[$projectId][$role] = []; } @@ -161,7 +161,7 @@ class Realtime $projectId = $connections[$connection]['projectId'] ?? ''; $roles = $connections[$connection]['roles'] ?? []; - foreach ($roles as $key => $role) { + foreach ($roles as $role) { foreach ($subscriptions[$projectId][$role] as $channel => $list) { unset($subscriptions[$projectId][$role][$channel][$connection]); // Remove connection