addressing some comments
This commit is contained in:
parent
134dba0b43
commit
8bcf349c38
4 changed files with 22 additions and 30 deletions
|
@ -41,7 +41,7 @@ use Utopia\CLI\Console;
|
|||
use Utopia\Database\Validator\Roles;
|
||||
use Utopia\Validator\Boolean;
|
||||
use Utopia\Database\Exception\Duplicate as DuplicateException;
|
||||
use Utopia\Queue\Client as queue;
|
||||
use Utopia\Queue\Client as QueueClient;
|
||||
|
||||
include_once __DIR__ . '/../shared/api.php';
|
||||
|
||||
|
@ -1155,8 +1155,8 @@ App::post('/v1/functions/:functionId/executions')
|
|||
->setContext('function', $function);
|
||||
|
||||
if ($async) {
|
||||
$queue = new queue(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource());
|
||||
$queue->enqueue([
|
||||
$queueForFunctions = new QueueClient(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource());
|
||||
$queueForFunctions->enqueue([
|
||||
'type' => 'http',
|
||||
'value' => [
|
||||
'type' => 'http',
|
||||
|
|
17
app/init.php
17
app/init.php
|
@ -38,8 +38,6 @@ use Appwrite\Network\Validator\IP;
|
|||
use Appwrite\Network\Validator\URL;
|
||||
use Appwrite\OpenSSL\OpenSSL;
|
||||
use Appwrite\URL\URL as AppwriteURL;
|
||||
use Utopia\Queue\Client as SyncOut;
|
||||
use Utopia\Queue\Connection\Redis as QueueRedis;
|
||||
use Appwrite\Usage\Stats;
|
||||
use Appwrite\Utopia\View;
|
||||
use Utopia\App;
|
||||
|
@ -67,7 +65,6 @@ use Utopia\Storage\Device\Wasabi;
|
|||
use Utopia\Cache\Adapter\Redis as RedisCache;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Adapter\MariaDB;
|
||||
use Utopia\Database\Adapter\MySQL;
|
||||
use Utopia\Pools\Group;
|
||||
|
@ -526,35 +523,30 @@ $register->set('pools', function () {
|
|||
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_CONSOLE', $fallbackForDB),
|
||||
'multiple' => false,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
'useResource' => true,
|
||||
],
|
||||
'database' => [
|
||||
'type' => 'database',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_PROJECT', $fallbackForDB),
|
||||
'multiple' => true,
|
||||
'schemes' => ['mariadb', 'mysql'],
|
||||
'useResource' => true,
|
||||
],
|
||||
'queue' => [
|
||||
'type' => 'queue',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis),
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
'useResource' => false,
|
||||
],
|
||||
'pubsub' => [
|
||||
'type' => 'pubsub',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_PUBSUB', $fallbackForRedis),
|
||||
'multiple' => false,
|
||||
'schemes' => ['redis'],
|
||||
'useResource' => true,
|
||||
],
|
||||
'cache' => [
|
||||
'type' => 'cache',
|
||||
'dsns' => App::getEnv('_APP_CONNECTIONS_CACHE', $fallbackForRedis),
|
||||
'multiple' => true,
|
||||
'schemes' => ['redis'],
|
||||
'useResource' => true,
|
||||
],
|
||||
];
|
||||
|
||||
|
@ -586,7 +578,7 @@ $register->set('pools', function () {
|
|||
$dsnScheme = $dsn->getScheme();
|
||||
$dsnDatabase = $dsn->getDatabase();
|
||||
|
||||
if (!in_array($dsnScheme, $schemes) && $useResource) {
|
||||
if (!in_array($dsnScheme, $schemes)) {
|
||||
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid console database scheme");
|
||||
}
|
||||
|
||||
|
@ -647,12 +639,12 @@ $register->set('pools', function () {
|
|||
$adapter->setDefaultDatabase($dsn->getDatabase());
|
||||
break;
|
||||
case 'pubsub':
|
||||
break;
|
||||
$adapter = $resource();
|
||||
break;
|
||||
case 'queue':
|
||||
$adapter = match ($dsn->getScheme()) {
|
||||
'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()),
|
||||
default => 'bla'
|
||||
default => null
|
||||
};
|
||||
break;
|
||||
case 'cache':
|
||||
|
@ -678,6 +670,7 @@ $register->set('pools', function () {
|
|||
|
||||
return $group;
|
||||
});
|
||||
|
||||
$register->set('influxdb', function () {
|
||||
// Register DB connection
|
||||
$host = App::getEnv('_APP_INFLUXDB_HOST', '');
|
||||
|
@ -853,7 +846,7 @@ App::setResource('messaging', fn() => new Phone());
|
|||
App::setResource('usage', function ($register) {
|
||||
return new Stats($register->get('statsd'));
|
||||
}, ['register']);
|
||||
App::setResource('clients', function ($request, $console, $project) use ($register) {
|
||||
App::setResource('clients', function ($request, $console, $project) {
|
||||
$console->setAttribute('platforms', [ // Always allow current host
|
||||
'$collection' => ID::custom('platforms'),
|
||||
'name' => 'Current Host',
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
require_once __DIR__ . '/init.php';
|
||||
|
||||
|
||||
use Swoole\Runtime;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
|
@ -14,7 +13,6 @@ use Utopia\Queue\Message;
|
|||
use Utopia\Queue\Server;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
|
||||
global $register;
|
||||
|
||||
Server::setResource('register', fn() => $register);
|
||||
|
@ -76,7 +74,7 @@ App::setResource('logger', function ($register) {
|
|||
|
||||
|
||||
$pools = $register->get('pools');
|
||||
$client = $pools->get('queue')->pop()->getResource();
|
||||
$connection = $pools->get('queue')->pop()->getResource();
|
||||
|
||||
|
||||
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
|
||||
|
|
|
@ -20,11 +20,12 @@ use Utopia\Database\Query;
|
|||
use Utopia\Database\Role;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Queue\Client as QueueClient;
|
||||
|
||||
Authorization::disable();
|
||||
Authorization::setDefaultStatus(false);
|
||||
|
||||
global $client;
|
||||
global $connection;
|
||||
global $workerNumber;
|
||||
|
||||
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
|
||||
|
@ -40,7 +41,7 @@ $execute = function (
|
|||
string $data = null,
|
||||
?Document $user = null,
|
||||
string $jwt = null
|
||||
) use ($executor) {
|
||||
) use ($executor, $register) {
|
||||
|
||||
$user ??= new Document();
|
||||
$functionId = $function->getId();
|
||||
|
@ -50,28 +51,28 @@ $execute = function (
|
|||
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
|
||||
|
||||
if ($deployment->getAttribute('resourceId') !== $functionId) {
|
||||
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
|
||||
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||
}
|
||||
|
||||
if ($deployment->isEmpty()) {
|
||||
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
|
||||
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||
}
|
||||
|
||||
/** Check if build has exists */
|
||||
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
|
||||
if ($build->isEmpty()) {
|
||||
throw new Exception('Build not found', 404);
|
||||
throw new Exception('Build not found');
|
||||
}
|
||||
|
||||
if ($build->getAttribute('status') !== 'ready') {
|
||||
throw new Exception('Build not ready', 400);
|
||||
throw new Exception('Build not ready');
|
||||
}
|
||||
|
||||
/** Check if runtime is supported */
|
||||
$runtimes = Config::getParam('runtimes', []);
|
||||
|
||||
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
|
||||
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
|
||||
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
|
||||
}
|
||||
|
||||
$runtime = $runtimes[$function->getAttribute('runtime')];
|
||||
|
@ -102,14 +103,14 @@ $execute = function (
|
|||
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
|
||||
|
||||
if ($build->getAttribute('status') !== 'ready') {
|
||||
throw new Exception('Build not ready', 400);
|
||||
throw new Exception('Build not ready');
|
||||
}
|
||||
|
||||
/** Check if runtime is supported */
|
||||
$runtimes = Config::getParam('runtimes', []);
|
||||
|
||||
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
|
||||
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
|
||||
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
|
||||
}
|
||||
|
||||
$runtime = $runtimes[$function->getAttribute('runtime')];
|
||||
|
@ -254,7 +255,7 @@ $execute = function (
|
|||
}
|
||||
};
|
||||
|
||||
$adapter = new Queue\Adapter\Swoole($client, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
|
||||
$adapter = new Queue\Adapter\Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
|
||||
$server = new Queue\Server($adapter);
|
||||
|
||||
$server->job()
|
||||
|
|
Loading…
Reference in a new issue