1
0
Fork 0
mirror of synced 2024-06-29 03:30:34 +12:00

add redis connection pool

This commit is contained in:
Torsten Dittmann 2021-02-26 10:21:07 +01:00
parent 5a2d7d4aa7
commit 28eed022f6
3 changed files with 41 additions and 22 deletions

View file

@ -21,16 +21,16 @@ use Utopia\Config\Config;
use Utopia\Registry\Registry;
use Utopia\Swoole\Request as SwooleRequest;
use PDO as PDONative;
use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
/**
* TODO List
*
* - Abuse Control / x mesages per connection
* - CORS Validation
* - Limit payload size
* - Message structure: { status: "ok"|"error", event: EVENT_NAME, data: <any arbitrary data> }
* - JWT Authentication (in path / or in message)
*
* Protocols Support:
@ -43,10 +43,13 @@ use Utopia\Abuse\Adapters\TimeLimit;
ini_set('default_socket_timeout', -1);
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$server = new Server("0.0.0.0", 80);
$server = new Server('0.0.0.0', 80);
$server->set([
'worker_num' => 1
'websocket_compression' => true,
'package_max_length' => 81920
]);
$subscriptions = [];
$connections = [];
@ -71,27 +74,33 @@ $register->set('db', function () { // Register DB connection
return $pdo;
});
$register->set('cache', function () { // Register cache connection
$redis = new Redis();
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
$register->set('cache', function () use (&$pool) { // Register cache connection
$config = new RedisConfig();
$user = App::getEnv('_APP_REDIS_USER', '');
$pass = App::getEnv('_APP_REDIS_PASS', '');
$auth = [];
$auth = '';
if (!empty($user)) {
$auth["user"] = $user;
$auth = $user;
}
if (!empty($pass)) {
$auth["pass"] = $pass;
$auth += ':' . $pass;
}
if (!empty($auth)) {
$redis->auth($auth);
$config->withAuth($auth);
}
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
$config
->withHost(App::getEnv('_APP_REDIS_HOST', ''))
->withPort(App::getEnv('_APP_REDIS_PORT', ''));
$pool = new RedisPool($config);
return $redis;
return $pool;
});
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
Console::success('Worker ' . ++$workerId . ' started succefully');
$attempts = 0;
@ -102,10 +111,10 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 1 sec delay between connection attempts
sleep(5); // 5 sec delay between connection attempts
}
$redis = $register->get('cache');
$redis = $register->get('cache')->get();
if ($redis->ping(true)) {
$attempts = 0;
@ -157,7 +166,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &
Console::error('Failed to restart pub/sub...');
});
$server->on("start", function (Server $server) {
$server->on('start', function (Server $server) {
Console::success('Server started succefully');
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
@ -182,7 +191,7 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
App::setResource('consoleDB', function () use (&$register) {
$consoleDB = new Database();
$consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis
$consoleDB->setAdapter(new RedisAdapter(new MySQLAdapter($register), $register, true)); // TODO: Add Redis
$consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects
$consoleDB->setMocks(Config::getParam('collections', []));
@ -248,8 +257,8 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 60, 60, function () use ($register) {
return $register->get('db');
});
$timeLimit->setNamespace('app_' . $project->getId());
$timeLimit
->setNamespace('app_' . $project->getId())
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
@ -355,4 +364,4 @@ $server->on('close', function (Server $server, int $fd) use (&$connections, &$su
Console::info('Connection close: ' . $fd);
});
$server->start();
$server->start();

View file

@ -19,16 +19,22 @@ class Redis extends Adapter
*/
protected $adapter;
/**
* @var bool
*/
protected $isPool;
/**
* Redis constructor.
*
* @param Adapter $adapter
* @param Registry $register
*/
public function __construct(Adapter $adapter, Registry $register)
public function __construct(Adapter $adapter, Registry $register, $isPool = false)
{
$this->register = $register;
$this->adapter = $adapter;
$this->isPool = $isPool;
}
/**
@ -261,7 +267,9 @@ class Redis extends Adapter
*/
protected function getRedis(): Client
{
return $this->register->get('cache');
return $this->isPool ?
$this->register->get('cache')->get() :
$this->register->get('cache');
}
/**

View file

@ -60,6 +60,8 @@ class Realtime
}
/**
* Identifies the receivers of all subscriptions, based on the permissions and event.
*
* @param array $event
* @param array $connections
* @param array $subscriptions