use redis connection pool
This commit is contained in:
parent
e79c5e93d0
commit
ba4eedb114
|
@ -9,8 +9,9 @@ use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
|||
use Appwrite\Database\Database;
|
||||
use Appwrite\Database\Document;
|
||||
use Appwrite\Database\Validator\Authorization;
|
||||
use Appwrite\Extend\PDO;
|
||||
use Appwrite\Realtime\Realtime;
|
||||
use Swoole\Database\RedisConfig;
|
||||
use Swoole\Database\RedisPool;
|
||||
use Swoole\WebSocket\Server;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Process;
|
||||
|
@ -18,9 +19,7 @@ use Swoole\WebSocket\Frame;
|
|||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Swoole\Request as SwooleRequest;
|
||||
use PDO as PDONative;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\TimeLimit;
|
||||
|
||||
|
@ -51,45 +50,30 @@ $server->set([
|
|||
$subscriptions = [];
|
||||
$connections = [];
|
||||
|
||||
$register = new Registry();
|
||||
|
||||
$register->set('db', function () { // Register DB connection
|
||||
$dbHost = App::getEnv('_APP_DB_HOST', '');
|
||||
$dbUser = App::getEnv('_APP_DB_USER', '');
|
||||
$dbPass = App::getEnv('_APP_DB_PASS', '');
|
||||
$dbScheme = App::getEnv('_APP_DB_SCHEMA', '');
|
||||
|
||||
$pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array(
|
||||
PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
|
||||
PDONative::ATTR_TIMEOUT => 3, // Seconds
|
||||
PDONative::ATTR_PERSISTENT => true
|
||||
));
|
||||
|
||||
// Connection settings
|
||||
$pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays
|
||||
$pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions
|
||||
|
||||
return $pdo;
|
||||
});
|
||||
|
||||
$register->set('cache', function () { // Register cache connection
|
||||
$redis = new Redis();
|
||||
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
|
||||
$user = App::getEnv('_APP_REDIS_USER', '');
|
||||
$pass = App::getEnv('_APP_REDIS_PASS', '');
|
||||
$auth = [];
|
||||
if (!empty($user)) {
|
||||
$auth["user"] = $user;
|
||||
$register->set('redis', function () {
|
||||
$user = App::getEnv('_APP_REDIS_USER','');
|
||||
$pass = App::getEnv('_APP_REDIS_PASS','');
|
||||
$auth = '';
|
||||
if(!empty($user)) {
|
||||
$auth += $user;
|
||||
}
|
||||
if (!empty($pass)) {
|
||||
$auth["pass"] = $pass;
|
||||
if(!empty($pass)) {
|
||||
$auth += ':' . $pass;
|
||||
}
|
||||
if (!empty($auth)) {
|
||||
$redis->auth($auth);
|
||||
}
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
$config = new RedisConfig();
|
||||
$config
|
||||
->withHost(App::getEnv('_APP_REDIS_HOST', ''))
|
||||
->withPort(App::getEnv('_APP_REDIS_PORT', ''))
|
||||
->withAuth($auth)
|
||||
->withTimeout(0)
|
||||
->withReadTimeout(0)
|
||||
->withRetryInterval(0);
|
||||
|
||||
|
||||
$pool = new RedisPool($config);
|
||||
|
||||
return $pool;
|
||||
});
|
||||
|
||||
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
|
||||
|
@ -106,7 +90,8 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &
|
|||
sleep(5); // 5 sec delay between connection attempts
|
||||
}
|
||||
|
||||
$redis = $register->get('cache');
|
||||
$redis = $register->get('redis')->get();
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
if ($redis->ping(true)) {
|
||||
$attempts = 0;
|
||||
|
@ -183,7 +168,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
|
|||
|
||||
App::setResource('consoleDB', function () use (&$register) {
|
||||
$consoleDB = new Database();
|
||||
$consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis
|
||||
$consoleDB->setAdapter(new RedisAdapter(new MySQLAdapter($register), $register, true));
|
||||
$consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects
|
||||
$consoleDB->setMocks(Config::getParam('collections', []));
|
||||
|
||||
|
@ -280,7 +265,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
|
|||
$server->close($connection);
|
||||
}
|
||||
|
||||
Realtime::subscribe($project->getId(), $connection, $subscriptions, $connections, $roles, $channels);
|
||||
Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
|
||||
|
||||
$server->push($connection, json_encode($channels));
|
||||
});
|
||||
|
|
|
@ -19,16 +19,23 @@ class Redis extends Adapter
|
|||
*/
|
||||
protected $adapter;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected $isPool = false;
|
||||
|
||||
/**
|
||||
* Redis constructor.
|
||||
*
|
||||
* @param Adapter $adapter
|
||||
* @param Registry $register
|
||||
* @param bool $isPool
|
||||
*/
|
||||
public function __construct(Adapter $adapter, Registry $register)
|
||||
public function __construct(Adapter $adapter, Registry $register, bool $isPool = false)
|
||||
{
|
||||
$this->register = $register;
|
||||
$this->adapter = $adapter;
|
||||
$this->isPool = $isPool;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,7 +268,7 @@ class Redis extends Adapter
|
|||
*/
|
||||
protected function getRedis(): Client
|
||||
{
|
||||
return $this->register->get('cache');
|
||||
return $this->isPool ? $this->register->get('redis')->get() : $this->register->get('cache');
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -79,7 +79,7 @@ class Realtime
|
|||
static function identifyReceivers(array &$event, array &$connections, array &$subscriptions)
|
||||
{
|
||||
$receivers = [];
|
||||
foreach ($connections as $fd => $connection) {
|
||||
foreach ($connections as $connection) {
|
||||
if ($connection['projectId'] !== $event['project']) {
|
||||
continue;
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ class Realtime
|
|||
$subscriptions[$projectId] = [];
|
||||
}
|
||||
|
||||
foreach ($roles as $key => $role) {
|
||||
foreach ($roles as $role) {
|
||||
if (!isset($subscriptions[$projectId][$role])) { // Add user first connection
|
||||
$subscriptions[$projectId][$role] = [];
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ class Realtime
|
|||
$projectId = $connections[$connection]['projectId'] ?? '';
|
||||
$roles = $connections[$connection]['roles'] ?? [];
|
||||
|
||||
foreach ($roles as $key => $role) {
|
||||
foreach ($roles as $role) {
|
||||
foreach ($subscriptions[$projectId][$role] as $channel => $list) {
|
||||
unset($subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
|
||||
|
||||
|
|
Loading…
Reference in a new issue