From 8bcf349c3814c996abebd12740c2ffe45a71cd5a Mon Sep 17 00:00:00 2001 From: shimon Date: Tue, 15 Nov 2022 18:03:42 +0200 Subject: [PATCH] addressing some comments --- app/controllers/api/functions.php | 10 +++++----- app/init.php | 17 +++++------------ app/worker.php | 4 +--- app/workers/functions.php | 21 +++++++++++---------- 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 5b969e63d8..6f09e27385 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -41,7 +41,7 @@ use Utopia\CLI\Console; use Utopia\Database\Validator\Roles; use Utopia\Validator\Boolean; use Utopia\Database\Exception\Duplicate as DuplicateException; -use Utopia\Queue\Client as queue; +use Utopia\Queue\Client as QueueClient; include_once __DIR__ . '/../shared/api.php'; @@ -1155,10 +1155,10 @@ App::post('/v1/functions/:functionId/executions') ->setContext('function', $function); if ($async) { - $queue = new queue(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource()); - $queue->enqueue([ - 'type' => 'http', - 'value' => [ + $queueForFunctions = new QueueClient(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource()); + $queueForFunctions->enqueue([ + 'type' => 'http', + 'value' => [ 'type' => 'http', 'execution' => $execution, 'function' => $function, diff --git a/app/init.php b/app/init.php index 4500526583..c5e898d836 100644 --- a/app/init.php +++ b/app/init.php @@ -38,8 +38,6 @@ use Appwrite\Network\Validator\IP; use Appwrite\Network\Validator\URL; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; -use Utopia\Queue\Client as SyncOut; -use Utopia\Queue\Connection\Redis as QueueRedis; use Appwrite\Usage\Stats; use Appwrite\Utopia\View; use Utopia\App; @@ -67,7 +65,6 @@ use Utopia\Storage\Device\Wasabi; use Utopia\Cache\Adapter\Redis as RedisCache; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; -use Utopia\CLI\Console; use Utopia\Database\Adapter\MariaDB; use Utopia\Database\Adapter\MySQL; use Utopia\Pools\Group; @@ -526,35 +523,30 @@ $register->set('pools', function () { 'dsns' => App::getEnv('_APP_CONNECTIONS_DB_CONSOLE', $fallbackForDB), 'multiple' => false, 'schemes' => ['mariadb', 'mysql'], - 'useResource' => true, ], 'database' => [ 'type' => 'database', 'dsns' => App::getEnv('_APP_CONNECTIONS_DB_PROJECT', $fallbackForDB), 'multiple' => true, 'schemes' => ['mariadb', 'mysql'], - 'useResource' => true, ], 'queue' => [ 'type' => 'queue', 'dsns' => App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis), 'multiple' => false, 'schemes' => ['redis'], - 'useResource' => false, ], 'pubsub' => [ 'type' => 'pubsub', 'dsns' => App::getEnv('_APP_CONNECTIONS_PUBSUB', $fallbackForRedis), 'multiple' => false, 'schemes' => ['redis'], - 'useResource' => true, ], 'cache' => [ 'type' => 'cache', 'dsns' => App::getEnv('_APP_CONNECTIONS_CACHE', $fallbackForRedis), 'multiple' => true, 'schemes' => ['redis'], - 'useResource' => true, ], ]; @@ -586,7 +578,7 @@ $register->set('pools', function () { $dsnScheme = $dsn->getScheme(); $dsnDatabase = $dsn->getDatabase(); - if (!in_array($dsnScheme, $schemes) && $useResource) { + if (!in_array($dsnScheme, $schemes)) { throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid console database scheme"); } @@ -647,12 +639,12 @@ $register->set('pools', function () { $adapter->setDefaultDatabase($dsn->getDatabase()); break; case 'pubsub': - break; $adapter = $resource(); + break; case 'queue': $adapter = match ($dsn->getScheme()) { 'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()), - default => 'bla' + default => null }; break; case 'cache': @@ -678,6 +670,7 @@ $register->set('pools', function () { return $group; }); + $register->set('influxdb', function () { // Register DB connection $host = App::getEnv('_APP_INFLUXDB_HOST', ''); @@ -853,7 +846,7 @@ App::setResource('messaging', fn() => new Phone()); App::setResource('usage', function ($register) { return new Stats($register->get('statsd')); }, ['register']); -App::setResource('clients', function ($request, $console, $project) use ($register) { +App::setResource('clients', function ($request, $console, $project) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), 'name' => 'Current Host', diff --git a/app/worker.php b/app/worker.php index bb35576172..4e484e1545 100644 --- a/app/worker.php +++ b/app/worker.php @@ -2,7 +2,6 @@ require_once __DIR__ . '/init.php'; - use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -14,7 +13,6 @@ use Utopia\Queue\Message; use Utopia\Queue\Server; use Utopia\Registry\Registry; - global $register; Server::setResource('register', fn() => $register); @@ -76,7 +74,7 @@ App::setResource('logger', function ($register) { $pools = $register->get('pools'); -$client = $pools->get('queue')->pop()->getResource(); +$connection = $pools->get('queue')->pop()->getResource(); $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); diff --git a/app/workers/functions.php b/app/workers/functions.php index 86fc2998b3..91edee19fe 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -20,11 +20,12 @@ use Utopia\Database\Query; use Utopia\Database\Role; use Utopia\Database\Validator\Authorization; use Utopia\Logger\Log; +use Utopia\Queue\Client as QueueClient; Authorization::disable(); Authorization::setDefaultStatus(false); -global $client; +global $connection; global $workerNumber; $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); @@ -40,7 +41,7 @@ $execute = function ( string $data = null, ?Document $user = null, string $jwt = null -) use ($executor) { +) use ($executor, $register) { $user ??= new Document(); $functionId = $function->getId(); @@ -50,28 +51,28 @@ $execute = function ( $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->getAttribute('resourceId') !== $functionId) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } if ($deployment->isEmpty()) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } /** Check if build has exists */ $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); if ($build->isEmpty()) { - throw new Exception('Build not found', 404); + throw new Exception('Build not found'); } if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready', 400); + throw new Exception('Build not ready'); } /** Check if runtime is supported */ $runtimes = Config::getParam('runtimes', []); if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } $runtime = $runtimes[$function->getAttribute('runtime')]; @@ -102,14 +103,14 @@ $execute = function ( $execution = $dbForProject->updateDocument('executions', $executionId, $execution); if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready', 400); + throw new Exception('Build not ready'); } /** Check if runtime is supported */ $runtimes = Config::getParam('runtimes', []); if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } $runtime = $runtimes[$function->getAttribute('runtime')]; @@ -254,7 +255,7 @@ $execute = function ( } }; -$adapter = new Queue\Adapter\Swoole($client, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); +$adapter = new Queue\Adapter\Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); $server = new Queue\Server($adapter); $server->job()