1
0
Fork 0
mirror of synced 2024-06-02 19:04:49 +12:00

add custom connection pool

This commit is contained in:
Torsten Dittmann 2021-03-16 15:30:45 +01:00
parent 520c065236
commit e14a495048
6 changed files with 177 additions and 100 deletions

View file

@ -31,15 +31,6 @@ ResqueScheduler::enqueueAt(\time() + 30, 'v1-certificates', 'CertificatesV1', [
'validateCNAME' => false,
]);
$register->set('cache', function () use ($register) { // Register cache connection
$redis = $register->get('redisPool')->get();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
return $redis;
}, true);
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$http = new Server("0.0.0.0", App::getEnv('PORT', 80));
$payloadSize = max(4000000 /* 4mb */, App::getEnv('_APP_STORAGE_LIMIT', 10000000 /* 10mb */));

View file

@ -148,27 +148,6 @@ Database::addFilter('encrypt',
/*
* Registry
*/
$register->set('dbPool', function () { // Register DB connection
$config = new PDOConfig();
$config
->withHost(App::getEnv('_APP_DB_HOST', ''))
->withPort(App::getEnv('_APP_DB_PORT', ''))
->withDbName(App::getEnv('_APP_DB_SCHEMA', ''))
->withUsername(App::getEnv('_APP_DB_USER', ''))
->withPassword(App::getEnv('_APP_DB_PASS', ''))
->withCharset('utf8mb4')
->withOptions([
PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
PDONative::ATTR_TIMEOUT => 3, // Seconds
PDONative::ATTR_PERSISTENT => true,
PDONative::ATTR_DEFAULT_FETCH_MODE => PDONative::FETCH_ASSOC,
PDONative::ATTR_ERRMODE => PDONative::ERRMODE_EXCEPTION,
]);
$pool = new PDOPool($config, 16384); // TODO: Investigate pool size
return $pool;
});
$register->set('db', function () use ($register) {
$dbHost = App::getEnv('_APP_DB_HOST', '');
$dbUser = App::getEnv('_APP_DB_USER', '');
@ -206,30 +185,6 @@ $register->set('statsd', function () { // Register DB connection
return $statsd;
});
$register->set('redisPool', function () {
$user = App::getEnv('_APP_REDIS_USER', '');
$pass = App::getEnv('_APP_REDIS_PASS', '');
$auth = '';
if (!empty($user)) {
$auth += $user;
}
if (!empty($pass)) {
$auth += ':' . $pass;
}
$config = new RedisConfig();
$config
->withHost(App::getEnv('_APP_REDIS_HOST', ''))
->withPort(App::getEnv('_APP_REDIS_PORT', ''))
->withAuth($auth)
->withTimeout(0)
->withReadTimeout(0)
->withRetryInterval(0);
$pool = new RedisPool($config, 16384); // TODO: Investigate pool size
return $pool;
});
$register->set('cache', function () { // Register cache connection
$redis = new Redis();
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));

View file

@ -2,12 +2,11 @@
require_once __DIR__ . '/init.php';
use Appwrite\Database\Pool\PDOPool;
use Appwrite\Database\Pool\RedisPool;
use Appwrite\Network\Validator\Origin;
use Appwrite\Realtime\Realtime;
use Appwrite\Utopia\Response;
use Swoole\Database\PDOProxy;
use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
use Swoole\Process;
use Swoole\Http\Request;
use Swoole\Http\Response as SwooleResponse;
@ -30,21 +29,7 @@ use Utopia\Abuse\Adapters\TimeLimit;
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$register->set('db', function () use ($register) {
$pool = $register->get('dbPool');
$pdo = $pool->get()->__getObject();
return $pdo;
}, true);
$register->set('cache', function () use ($register) { // Register cache connection
$redis = $register->get('redisPool')->get();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
return $redis;
}, true);
$server = new Server('0.0.0.0', 80);
$server = new Server('0.0.0.0', 80, SWOOLE_PROCESS);
$server->set([
'package_max_length' => 64000 // Default maximum Package Size (64kb)
@ -53,11 +38,38 @@ $server->set([
$subscriptions = [];
$connections = [];
$register->set('dbPool', function () { // Register DB connection
$dbHost = App::getEnv('_APP_DB_HOST', '');
$dbUser = App::getEnv('_APP_DB_USER', '');
$dbPass = App::getEnv('_APP_DB_PASS', '');
$dbScheme = App::getEnv('_APP_DB_SCHEMA', '');
$pool = new PDOPool(20, $dbHost, $dbScheme, $dbUser, $dbPass);
return $pool;
});
$register->set('redisPool', function () {
$user = App::getEnv('_APP_REDIS_USER', '');
$pass = App::getEnv('_APP_REDIS_PASS', '');
$auth = [];
if ($user) {
$auth[] = $user;
}
if ($pass) {
$auth[] = $pass;
}
$pool = new RedisPool(20, App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''), $auth);
return $pool;
});
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
Console::success('Worker ' . $workerId . ' started succefully');
$attempts = 0;
$start = time();
$redisPool = $register->get('redisPool');
while ($attempts < 300) {
try {
@ -67,7 +79,8 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &
sleep(5); // 5 sec delay between connection attempts
}
$redis = $register->get('cache');
/** @var Swoole\Coroutine\Redis $redis */
$redis = $redisPool->get();
if ($redis->ping(true)) {
$attempts = 0;
@ -108,6 +121,7 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &
});
} catch (\Throwable $th) {
Console::error('Pub/sub error: ' . $th->getMessage());
$redisPool->put($redis);
$attempts++;
continue;
}
@ -135,6 +149,17 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
$connection = $request->fd;
$request = new SwooleRequest($request);
$db = $register->get('dbPool')->get();
$redis = $register->get('redisPool')->get();
$register->set('db', function () use (&$db) {
return $db;
});
$register->set('cache', function () use (&$redis) { // Register cache connection
return $redis;
});
Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})");
App::setResource('request', function () use ($request) {
@ -145,16 +170,16 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
return new Response(new SwooleResponse());
});
/** @var Appwrite\Database\Document $user */
$user = $app->getResource('user');
/** @var Appwrite\Database\Document $project */
$project = $app->getResource('project');
/** @var Appwrite\Database\Document $console */
$console = $app->getResource('console');
try {
/** @var Appwrite\Database\Document $user */
$user = $app->getResource('user');
/** @var Appwrite\Database\Document $project */
$project = $app->getResource('project');
/** @var Appwrite\Database\Document $console */
$console = $app->getResource('console');
/*
* Project Check
*/
@ -167,8 +192,8 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use ($register) {
return $register->get('db');
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use ($db) {
return $db;
});
$timeLimit
->setNamespace('app_' . $project->getId())
@ -208,21 +233,6 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
$server->push($connection, json_encode($channels));
/**
* Put used PDO and Redis Connections back into their pools.
*/
/** @var Swoole\Database\PDOPool $dbPool */
$dbPool = $register->get('dbPool');
$dbPool->put(new PDOProxy(function () use ($register) {
return $register->get('db');
}
));
/** @var Swoole\Database\RedisPool $redisPool */
$redisPool = $register->get('redisPool');
$redisPool->put($register->get('cache'));
} catch (\Throwable $th) {
$response = [
'code' => $th->getCode(),
@ -231,6 +241,16 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
$server->push($connection, json_encode($response));
$server->close($connection);
}
/**
* Put used PDO and Redis Connections back into their pools.
*/
/** @var PDOPool $dbPool */
$dbPool = $register->get('dbPool');
$dbPool->put($db);
/** @var RedisPool $redisPool */
$redisPool = $register->get('redisPool');
$redisPool->put($redis);
});
$server->on('message', function (Server $server, Frame $frame) {

View file

@ -0,0 +1,19 @@
<?php
namespace Appwrite\Database;
abstract class Pool
{
protected $available = true;
protected $pool;
protected $size = 5;
abstract public function get();
public function destruct ()
{
$this->available = false;
while (!$this->pool->isEmpty()) {
$this->pool->pop();
}
}
}

View file

@ -0,0 +1,50 @@
<?php
namespace Appwrite\Database\Pool;
use Appwrite\Database\Pool;
use Exception;
use PDO;
use SplQueue;
class PDOPool extends Pool
{
public function __construct(int $size, string $host = 'localhost', string $schema = 'appwrite', string $user = '', string $pass = '', string $charset = 'utf8mb4')
{
$this->pool = new SplQueue;
$this->size = $size;
for ($i=0; $i < $this->size; $i++) {
$pdo = new PDO(
"mysql:".
"host={$host};".
"dbname={$schema};" .
"charset={$charset}",
$user,
$pass,
[
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
PDO::ATTR_TIMEOUT => 3, // Seconds
PDO::ATTR_PERSISTENT => true,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => true
]
);
$this->pool->enqueue($pdo);
}
}
public function put (PDO $pdo)
{
$this->pool->enqueue($pdo);
}
public function get (): PDO
{
if ($this->available && count($this->pool) > 0) {
return $this->pool->dequeue();
}
sleep(0.01);
return $this->get();
}
}

View file

@ -0,0 +1,42 @@
<?php
namespace Appwrite\Database\Pool;
use Appwrite\Database\Pool;
use SplQueue;
use Redis;
class RedisPool extends Pool
{
public function __construct(int $size, string $host, int $port, array $auth = [])
{
$this->pool = new SplQueue;
$this->size = $size;
for ($i=0; $i < $this->size; $i++) {
$redis = new Redis();
$redis->pconnect($host, $port);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($auth) {
$redis->auth($auth);
}
$this->pool->enqueue($redis);
}
}
public function put (Redis $redis)
{
$this->pool->enqueue($redis);
}
public function get (): Redis
{
if ($this->available && !$this->pool->isEmpty()) {
return $this->pool->dequeue();
}
sleep(0.1);
return $this->get();
}
}