Init new connection pool, removed old extensions
This commit is contained in:
parent
f49f1b4755
commit
daecc1aa76
5 changed files with 184 additions and 306 deletions
|
@ -307,7 +307,7 @@ return [
|
|||
'filter' => 'password'
|
||||
],
|
||||
// [
|
||||
// 'name' => '_APP_DB_PROJECT',
|
||||
// 'name' => '_APP_CONNECTIONS_DB_PROJECT',
|
||||
// 'description' => 'A list of comma-separated key value pairs representing Project DBs where key is the database name and value is the DSN connection string.',
|
||||
// 'introduction' => 'TBD',
|
||||
// 'default' => 'db_fra1_01=mysql://user:password@mariadb:3306/appwrite',
|
||||
|
@ -316,7 +316,7 @@ return [
|
|||
// 'filter' => ''
|
||||
// ],
|
||||
// [
|
||||
// 'name' => '_APP_DB_CONSOLE',
|
||||
// 'name' => '_APP_CONNECTIONS_DB_CONSOLE',
|
||||
// 'description' => 'A key value pair representing the Console DB where key is the database name and value is the DSN connection string.',
|
||||
// 'introduction' => 'TBD',
|
||||
// 'default' => 'db_fra1_01=mysql://user:password@mariadb:3306/appwrite',
|
||||
|
|
257
app/init.php
257
app/init.php
|
@ -34,6 +34,7 @@ use Appwrite\Event\Database as EventDatabase;
|
|||
use Appwrite\Event\Event;
|
||||
use Appwrite\Event\Mail;
|
||||
use Appwrite\Event\Phone;
|
||||
use Appwrite\Event\Delete;
|
||||
use Appwrite\Network\Validator\Email;
|
||||
use Appwrite\Network\Validator\IP;
|
||||
use Appwrite\Network\Validator\URL;
|
||||
|
@ -41,25 +42,19 @@ use Appwrite\OpenSSL\OpenSSL;
|
|||
use Appwrite\Usage\Stats;
|
||||
use Appwrite\Utopia\View;
|
||||
use Utopia\App;
|
||||
use Utopia\Validator\Range;
|
||||
use Utopia\Validator\WhiteList;
|
||||
use Utopia\Database\ID;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Database\Validator\DatetimeValidator;
|
||||
use Utopia\Database\Validator\Structure;
|
||||
use Utopia\Logger\Logger;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Locale\Locale;
|
||||
use Utopia\Registry\Registry;
|
||||
use MaxMind\Db\Reader;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Database;
|
||||
use Appwrite\Database\Pools;
|
||||
use Appwrite\Event\Delete;
|
||||
use Utopia\Database\Validator\Structure;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Validator\Range;
|
||||
use Utopia\Validator\WhiteList;
|
||||
use Swoole\Database\RedisConfig;
|
||||
use Swoole\Database\RedisPool;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\DatetimeValidator;
|
||||
use Utopia\Storage\Device;
|
||||
use Utopia\Storage\Storage;
|
||||
use Utopia\Storage\Device\Backblaze;
|
||||
|
@ -68,6 +63,16 @@ use Utopia\Storage\Device\Local;
|
|||
use Utopia\Storage\Device\S3;
|
||||
use Utopia\Storage\Device\Linode;
|
||||
use Utopia\Storage\Device\Wasabi;
|
||||
use MaxMind\Db\Reader;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Swoole\Database\PDOProxy;
|
||||
use Utopia\Cache\Adapter\Redis as RedisCache;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\Database\Adapter\MariaDB;
|
||||
use Utopia\Database\Adapter\MySQL;
|
||||
use Utopia\Pools\Connection;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Pools\Pool;
|
||||
|
||||
const APP_NAME = 'Appwrite';
|
||||
const APP_DOMAIN = 'appwrite.io';
|
||||
|
@ -492,53 +497,146 @@ $register->set('logger', function () {
|
|||
return new Logger($adapter);
|
||||
});
|
||||
|
||||
$register->set('pools', function () {
|
||||
|
||||
$register->set('dbPool', function () {
|
||||
/** Parse the console databases */
|
||||
$consoleDB = App::getEnv('_APP_DB_CONSOLE', '');
|
||||
$consoleDB = explode(',', $consoleDB)[0];
|
||||
$consoleDB = explode('=', $consoleDB);
|
||||
$name = $consoleDB[0];
|
||||
$dsn = $consoleDB[1];
|
||||
$consoleDBs[$name] = $dsn;
|
||||
$group= new Group();
|
||||
|
||||
/** Parse the project databases */
|
||||
$projectDBs = [];
|
||||
$projectDB = App::getEnv('_APP_DB_PROJECT', '');
|
||||
$projectDB = explode(',', $projectDB);
|
||||
foreach ($projectDB as $db) {
|
||||
$db = explode('=', $db);
|
||||
$name = $db[0];
|
||||
$dsn = $db[1];
|
||||
$projectDBs[$name] = $dsn;
|
||||
$connections = [
|
||||
'console' => [
|
||||
'type' => 'database',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_CONSOLE', ''),
|
||||
'multiple' => false,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
],
|
||||
'database' => [
|
||||
'type' => 'database',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_PROJECT', ''),
|
||||
'multiple' => true,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
],
|
||||
'queue' => [
|
||||
'type' => 'queue',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_QUEUE', ''),
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
'pubsub' => [
|
||||
'type' => 'pubsub',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_PUBSUB', ''),
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
'cache' => [
|
||||
'type' => 'cache',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_CACHE', ''),
|
||||
'multiple' => true,
|
||||
'schemes' => ['redis'],
|
||||
],
|
||||
];
|
||||
|
||||
foreach ($connections as $key => $connection) {
|
||||
$type = $connection['type'] ?? '';
|
||||
$dsns = $connection['dsns'] ?? '';
|
||||
$multipe = $connection['multiple'] ?? false;
|
||||
$schemes = $connection['schemes'] ?? [];
|
||||
|
||||
$variable = explode(',', $connection['dsns'] ?? '');
|
||||
$dsns = [];
|
||||
|
||||
foreach ($variable as $dsn) {
|
||||
$dsn = explode('=', $dsn);
|
||||
$name = ($multipe) ? $key.'_'.$dsn[0] : $key;
|
||||
$dsn = $dsn[1];
|
||||
|
||||
$dsn = new DSN($dsn);
|
||||
$dsnHost = $dsn->getHost();
|
||||
$dsnPort = $dsn->getPort();
|
||||
$dsnUser = $dsn->getUser();
|
||||
$dsnPass = $dsn->getPassword();
|
||||
$dsnScheme = $dsn->getDatabase();
|
||||
|
||||
if(!in_array($dsns[$name]->getScheme(), $schemes)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid console database scheme");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Resource
|
||||
*
|
||||
* Creation could be reused accross connection types like database, cache, queue, etc.
|
||||
*
|
||||
* Resource assignment to an adapter will happen below.
|
||||
*/
|
||||
|
||||
switch ($dsn->getScheme()) {
|
||||
case 'mysql':
|
||||
case 'mariadb':
|
||||
$resource = new PDOProxy("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnScheme};charset=utf8mb4", $dsnUser, $dsnPass, array(
|
||||
PDO::ATTR_TIMEOUT => 3, // Seconds
|
||||
PDO::ATTR_PERSISTENT => true,
|
||||
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
|
||||
PDO::ATTR_ERRMODE => App::isDevelopment() ? PDO::ERRMODE_WARNING : PDO::ERRMODE_SILENT, // If in production mode, warnings are not displayed
|
||||
PDO::ATTR_EMULATE_PREPARES => true,
|
||||
PDO::ATTR_STRINGIFY_FETCHES => true
|
||||
));
|
||||
break;
|
||||
case 'redis':
|
||||
$resource = new Redis();
|
||||
$resource->pconnect($dsnHost, $dsnHost);
|
||||
if($dsnPass) {
|
||||
$resource->auth($dsnPass);
|
||||
}
|
||||
$resource->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid scheme");
|
||||
break;
|
||||
}
|
||||
|
||||
// Get Adapter
|
||||
|
||||
switch ($type) {
|
||||
case 'database':
|
||||
$adapter = match ($dsn->getScheme()) {
|
||||
'mariadb' => new MariaDB($resource),
|
||||
'mariadb' => new MySQL($resource),
|
||||
default => null
|
||||
};
|
||||
|
||||
break;
|
||||
case 'queue':
|
||||
//$adapter = new Queue($resource);
|
||||
break;
|
||||
case 'pubsub':
|
||||
//$adapter = new PubSub($resource);
|
||||
break;
|
||||
case 'cache':
|
||||
$adapter = match ($dsn->getScheme()) {
|
||||
'redis' => new RedisCache($resource),
|
||||
default => null
|
||||
};
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation.");
|
||||
break;
|
||||
}
|
||||
|
||||
if(is_null($adapter)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation.");
|
||||
}
|
||||
|
||||
$pool = new Pool($name, 64, function () use ($adapter) {
|
||||
return new Connection($adapter);
|
||||
});
|
||||
|
||||
$group->add($pool);
|
||||
}
|
||||
}
|
||||
|
||||
$pool = new Pools($consoleDBs, $projectDBs);
|
||||
return $pool;
|
||||
return $group;
|
||||
});
|
||||
|
||||
$register->set('redisPool', function () {
|
||||
$redisHost = App::getEnv('_APP_REDIS_HOST', '');
|
||||
$redisPort = App::getEnv('_APP_REDIS_PORT', '');
|
||||
$redisUser = App::getEnv('_APP_REDIS_USER', '');
|
||||
$redisPass = App::getEnv('_APP_REDIS_PASS', '');
|
||||
$redisAuth = '';
|
||||
|
||||
if ($redisUser && $redisPass) {
|
||||
$redisAuth = $redisUser . ':' . $redisPass;
|
||||
}
|
||||
|
||||
$pool = new RedisPool(
|
||||
(new RedisConfig())
|
||||
->withHost($redisHost)
|
||||
->withPort($redisPort)
|
||||
->withAuth($redisAuth)
|
||||
->withDbIndex(0),
|
||||
64
|
||||
);
|
||||
|
||||
return $pool;
|
||||
});
|
||||
$register->set('influxdb', function () {
|
||||
// Register DB connection
|
||||
$host = App::getEnv('_APP_INFLUXDB_HOST', '');
|
||||
|
@ -594,14 +692,6 @@ $register->set('smtp', function () {
|
|||
$register->set('geodb', function () {
|
||||
return new Reader(__DIR__ . '/db/DBIP/dbip-country-lite-2022-06.mmdb');
|
||||
});
|
||||
$register->set('cache', function () {
|
||||
// This is usually for our workers or CLI commands scope
|
||||
$redis = new Redis();
|
||||
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
});
|
||||
|
||||
/*
|
||||
* Localization
|
||||
|
@ -899,22 +989,35 @@ App::setResource('console', function () {
|
|||
]);
|
||||
}, []);
|
||||
|
||||
App::setResource('dbForProject', function ($dbPool, $cache, Document $project) {
|
||||
$database = $project->getAttribute('database', '');
|
||||
if (empty($database)) {
|
||||
$database = $dbPool->getConsoleDB();
|
||||
}
|
||||
$pdo = $dbPool->getPDOFromPool($database);
|
||||
$database = Pools::getDatabase($pdo->getConnection(), $cache, "_{$project->getInternalId()}");
|
||||
return $database;
|
||||
}, ['dbPool', 'cache', 'project']);
|
||||
App::setResource('dbForProject', function (Group $pools, Cache $cache, Document $project) {
|
||||
$dbAdapter = $pools
|
||||
->get($project->getAttribute('database', 'console'))
|
||||
->pop()
|
||||
->getResource()
|
||||
;
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
|
||||
$database->setNamespace("_{$project->getInternalId()}");
|
||||
$database->setDefaultDatabase('appwrite');
|
||||
|
||||
App::setResource('dbForConsole', function ($dbPool, $cache) {
|
||||
$database = $dbPool->getConsoleDB();
|
||||
$pdo = $dbPool->getPDOFromPool($database);
|
||||
$database = Pools::getDatabase($pdo->getConnection(), $cache, '_console');
|
||||
return $database;
|
||||
}, ['dbPool', 'cache']);
|
||||
}, ['pools', 'cache', 'project']);
|
||||
|
||||
App::setResource('dbForConsole', function (Group $pools, Cache $cache) {
|
||||
$dbAdapter = $pools
|
||||
->get('console')
|
||||
->pop()
|
||||
->getResource()
|
||||
;
|
||||
|
||||
$database = new Database($dbAdapter, $cache);
|
||||
|
||||
$database->setNamespace('console');
|
||||
$database->setDefaultDatabase('appwrite');
|
||||
|
||||
return $database;
|
||||
}, ['pools', 'cache']);
|
||||
|
||||
App::setResource('deviceLocal', function () {
|
||||
return new Local();
|
||||
|
|
|
@ -122,7 +122,7 @@ class Pools
|
|||
*/
|
||||
public function getPDOFromPool(string $name): PDOWrapper
|
||||
{
|
||||
$pool = $this->pools[$name] ?? throw new Exception("Database pool with name : $name not found. Check the value of _APP_DB_PROJECT in .env", 500);
|
||||
$pool = $this->pools[$name] ?? throw new Exception("Database pool with name : $name not found. Check the value of _APP_CONNECTIONS_DB_PROJECT in .env", 500);
|
||||
$pdo = $pool->get();
|
||||
return $pdo;
|
||||
}
|
||||
|
@ -135,7 +135,7 @@ class Pools
|
|||
public function getAnyFromPool(): PDOWrapper
|
||||
{
|
||||
$name = array_rand($this->pools);
|
||||
$pool = $this->pools[$name] ?? throw new Exception("Database pool with name : $name not found. Check the value of _APP_DB_PROJECT in .env", 500);
|
||||
$pool = $this->pools[$name] ?? throw new Exception("Database pool with name : $name not found. Check the value of _APP_CONNECTIONS_DB_PROJECT in .env", 500);
|
||||
$pdo = $pool->get();
|
||||
return $pdo;
|
||||
}
|
||||
|
|
|
@ -1,110 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Extend;
|
||||
|
||||
use PDO as PDONative;
|
||||
|
||||
class PDO extends PDONative
|
||||
{
|
||||
/**
|
||||
* @var PDONative
|
||||
*/
|
||||
protected $pdo;
|
||||
|
||||
/**
|
||||
* @var mixed
|
||||
*/
|
||||
protected $dsn;
|
||||
|
||||
/**
|
||||
* @var mixed
|
||||
*/
|
||||
protected $username;
|
||||
|
||||
/**
|
||||
* @var mixed
|
||||
*/
|
||||
protected $passwd;
|
||||
|
||||
/**
|
||||
* @var mixed
|
||||
*/
|
||||
protected $options;
|
||||
|
||||
/**
|
||||
* Create A Proxy PDO Object
|
||||
*/
|
||||
public function __construct($dsn, $username = null, $passwd = null, $options = null)
|
||||
{
|
||||
$this->dsn = $dsn;
|
||||
$this->username = $username;
|
||||
$this->passwd = $passwd;
|
||||
$this->options = $options;
|
||||
|
||||
$this->pdo = new PDONative($dsn, $username, $passwd, $options);
|
||||
}
|
||||
|
||||
public function setAttribute($attribute, $value)
|
||||
{
|
||||
return $this->pdo->setAttribute($attribute, $value);
|
||||
}
|
||||
|
||||
public function prepare($statement, $driver_options = null)
|
||||
{
|
||||
return new PDOStatement($this, $this->pdo->prepare($statement, []));
|
||||
}
|
||||
|
||||
public function quote($string, $parameter_type = PDONative::PARAM_STR)
|
||||
{
|
||||
return $this->pdo->quote($string, $parameter_type);
|
||||
}
|
||||
|
||||
public function beginTransaction()
|
||||
{
|
||||
try {
|
||||
$result = $this->pdo->beginTransaction();
|
||||
} catch (\Throwable $th) {
|
||||
$this->pdo = $this->reconnect();
|
||||
$result = $this->pdo->beginTransaction();
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function rollBack()
|
||||
{
|
||||
try {
|
||||
$result = $this->pdo->rollBack();
|
||||
} catch (\Throwable $th) {
|
||||
$this->pdo = $this->reconnect();
|
||||
return false;
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function commit()
|
||||
{
|
||||
try {
|
||||
$result = $this->pdo->commit();
|
||||
} catch (\Throwable $th) {
|
||||
$this->pdo = $this->reconnect();
|
||||
$result = $this->pdo->commit();
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function reconnect(): PDONative
|
||||
{
|
||||
$this->pdo = new PDONative($this->dsn, $this->username, $this->passwd, $this->options);
|
||||
|
||||
echo '[PDO] MySQL connection restarted' . PHP_EOL;
|
||||
|
||||
// Connection settings
|
||||
$this->pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays
|
||||
$this->pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions
|
||||
|
||||
return $this->pdo;
|
||||
}
|
||||
}
|
|
@ -1,115 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Extend;
|
||||
|
||||
use PDO as PDONative;
|
||||
use PDOStatement as PDOStatementNative;
|
||||
|
||||
class PDOStatement extends PDOStatementNative
|
||||
{
|
||||
/**
|
||||
* @var PDO
|
||||
*/
|
||||
protected $pdo;
|
||||
|
||||
/**
|
||||
* Params
|
||||
*/
|
||||
protected $params = [];
|
||||
|
||||
/**
|
||||
* Values
|
||||
*/
|
||||
protected $values = [];
|
||||
|
||||
/**
|
||||
* Columns
|
||||
*/
|
||||
protected $columns = [];
|
||||
|
||||
/**
|
||||
* @var PDOStatementNative
|
||||
*/
|
||||
protected $PDOStatement;
|
||||
|
||||
public function __construct(PDO &$pdo, PDOStatementNative $PDOStatement)
|
||||
{
|
||||
$this->pdo = &$pdo;
|
||||
$this->PDOStatement = $PDOStatement;
|
||||
}
|
||||
|
||||
public function bindValue($parameter, $value, $data_type = PDONative::PARAM_STR)
|
||||
{
|
||||
$this->values[$parameter] = ['value' => $value, 'data_type' => $data_type];
|
||||
|
||||
$result = $this->PDOStatement->bindValue($parameter, $value, $data_type);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function bindParam($parameter, &$variable, $data_type = PDONative::PARAM_STR, $length = null, $driver_options = null)
|
||||
{
|
||||
$this->params[$parameter] = ['value' => &$variable, 'data_type' => $data_type, 'length' => $length, 'driver_options' => $driver_options];
|
||||
|
||||
$result = $this->PDOStatement->bindParam($parameter, $variable, $data_type, $length, $driver_options);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function bindColumn($column, &$param, $type = null, $maxlen = null, $driverdata = null)
|
||||
{
|
||||
$this->columns[$column] = ['param' => &$param, 'type' => $type, 'maxlen' => $maxlen, 'driverdata' => $driverdata];
|
||||
|
||||
$result = $this->PDOStatement->bindColumn($column, $param, $type, $maxlen, $driverdata);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function execute($input_parameters = null)
|
||||
{
|
||||
try {
|
||||
$result = $this->PDOStatement->execute($input_parameters);
|
||||
} catch (\Throwable $th) {
|
||||
$this->pdo = $this->pdo->reconnect();
|
||||
$this->PDOStatement = $this->pdo->prepare($this->PDOStatement->queryString, []);
|
||||
|
||||
foreach ($this->values as $key => $set) {
|
||||
$this->PDOStatement->bindValue($key, $set['value'], $set['data_type']);
|
||||
}
|
||||
|
||||
foreach ($this->params as $key => $set) {
|
||||
$this->PDOStatement->bindParam($key, $set['variable'], $set['data_type'], $set['length'], $set['driver_options']);
|
||||
}
|
||||
|
||||
foreach ($this->columns as $key => $set) {
|
||||
$this->PDOStatement->bindColumn($key, $set['param'], $set['type'], $set['maxlen'], $set['driverdata']);
|
||||
}
|
||||
|
||||
$result = $this->PDOStatement->execute($input_parameters);
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function fetch($fetch_style = PDONative::FETCH_ASSOC, $cursor_orientation = PDONative::FETCH_ORI_NEXT, $cursor_offset = 0)
|
||||
{
|
||||
$result = $this->PDOStatement->fetch($fetch_style, $cursor_orientation, $cursor_offset);
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch All
|
||||
*
|
||||
* @param int $fetch_style
|
||||
* @param mixed $fetch_args
|
||||
*
|
||||
* @return array|false
|
||||
*/
|
||||
public function fetchAll(int $fetch_style = PDO::FETCH_BOTH, mixed ...$fetch_args)
|
||||
{
|
||||
$result = $this->PDOStatement->fetchAll();
|
||||
|
||||
return $result;
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue