1
0
Fork 0
mirror of synced 2024-06-01 18:39:57 +12:00

fix: all of the workers

This commit is contained in:
Torsten Dittmann 2022-06-20 11:22:53 +02:00
parent 52a857274d
commit 3aa49f773a
11 changed files with 78 additions and 80 deletions

View file

@ -292,11 +292,10 @@ App::init(function (App $utopia, Request $request, Response $response, Document
$service = $route->getLabel('sdk.namespace', '');
if (!empty($service)) {
$roles = Authorization::getRoles();
if (
array_key_exists($service, $project->getAttribute('services', []))
&& !$project->getAttribute('services', [])[$service]
&& !(Auth::isPrivilegedUser($roles) || Auth::isAppUser($roles))
&& !(Auth::isPrivilegedUser(Authorization::getRoles()) || Auth::isAppUser(Authorization::getRoles()))
) {
throw new AppwriteException('Service is disabled', 503, AppwriteException::GENERAL_SERVICE_DISABLED);
}

View file

@ -839,6 +839,7 @@ App::setResource('project', function ($dbForConsole, $request, $console) {
App::setResource('console', function () {
return new Document([
'$id' => 'console',
'$internalId' => 'console',
'name' => 'Appwrite',
'$collection' => 'projects',
'description' => 'Appwrite core engine',

View file

@ -298,7 +298,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
[$database, $returnDatabase] = getDatabase($register, "_{$projectId}");
[$consoleDatabase, $returnConsoleDatabase] = getDatabase($register, '_console');
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
[$database, $returnDatabase] = getDatabase($register, "_{$project->getInternalId()}");
$user = $database->getDocument('users', $userId);
@ -307,6 +309,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
call_user_func($returnDatabase);
call_user_func($returnConsoleDatabase);
}
}
@ -481,7 +484,9 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace("_{$realtime->connections[$connection]['projectId']}");
$database->setNamespace("_console");
$project = Authorization::skip(fn() => $database->getDocument('projects', $realtime->connections[$connection]['projectId']));
$database->setNamespace("_{$project->getInternalId()}");
/*
* Abuse Check

View file

@ -15,9 +15,6 @@ use Utopia\Config\Config;
require_once __DIR__ . '/../init.php';
// Disable Auth since we already validate it in the API
Authorization::disable();
Console::title('Builds V1 Worker');
Console::success(APP_NAME . ' build worker v1 has started');

View file

@ -36,8 +36,6 @@ class CertificatesV1 extends Worker
public function run(): void
{
Authorization::disable();
Authorization::setDefaultStatus(false);
/**
* 1. Read arguments and validate domain
* 2. Get main domain

View file

@ -20,8 +20,6 @@ class DatabaseV1 extends Worker
public function run(): void
{
Authorization::disable();
$type = $this->args['type'];
$project = new Document($this->args['project']);
$collection = new Document($this->args['collection'] ?? []);
@ -53,8 +51,6 @@ class DatabaseV1 extends Worker
Console::error('No database operation for type: ' . $type);
break;
}
Authorization::reset();
}
public function shutdown(): void

View file

@ -15,9 +15,6 @@ use Utopia\Audit\Audit;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Deletes V1 Worker');
Console::success(APP_NAME . ' deletes worker v1 has started' . "\n");

View file

@ -44,6 +44,10 @@ class FunctionsV1 extends Worker
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
if ($project->getId() === 'console') {
return;
}
$database = $this->getProjectDB($project->getId());
/**
@ -57,7 +61,7 @@ class FunctionsV1 extends Worker
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = Authorization::skip(fn () => $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]));
$functions = $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]);
$sum = \count($functions);
$offset = $offset + $limit;
@ -101,7 +105,7 @@ class FunctionsV1 extends Worker
$jwt = $this->args['jwt'] ?? '';
$data = $this->args['data'] ?? '';
$function = Authorization::skip(fn () => $database->getDocument('functions', $execution->getAttribute('functionId')));
$function = $database->getDocument('functions', $execution->getAttribute('functionId'));
$this->execute(
project: $project,
@ -132,7 +136,7 @@ class FunctionsV1 extends Worker
*/
// Reschedule
$function = Authorization::skip(fn () => $database->getDocument('functions', $function->getId()));
$function = $database->getDocument('functions', $function->getId());
if (empty($function->getId())) {
throw new Exception('Function not found (' . $function->getId() . ')');
@ -149,11 +153,11 @@ class FunctionsV1 extends Worker
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', \time());
$function = Authorization::skip(fn () => $database->updateDocument(
$function = $database->updateDocument(
'functions',
$function->getId(),
$function->setAttribute('scheduleNext', (int) $next)
));
);
if ($function === false) {
throw new Exception('Function update failed.');
@ -198,7 +202,7 @@ class FunctionsV1 extends Worker
$deploymentId = $function->getAttribute('deployment', '');
/** Check if deployment exists */
$deployment = Authorization::skip(fn () => $dbForProject->getDocument('deployments', $deploymentId));
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
@ -209,7 +213,7 @@ class FunctionsV1 extends Worker
}
/** Check if build has exists */
$build = Authorization::skip(fn () => $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')));
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
}
@ -228,35 +232,31 @@ class FunctionsV1 extends Worker
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = Authorization::skip(function () use ($dbForProject, &$executionId, $functionId, $deploymentId, $trigger, $user) {
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = $dbForProject->getId();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$read' => $user->isEmpty() ? [] : ['user:' . $user->getId()],
'$write' => [],
'dateCreated' => time(),
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'time' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
$executionId = $dbForProject->getId();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$read' => $user->isEmpty() ? [] : ['user:' . $user->getId()],
'$write' => [],
'dateCreated' => time(),
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'time' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
throw new Exception('Failed to create or read execution');
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
return $execution;
});
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Collect environment variables */
$vars = [
@ -307,8 +307,7 @@ class FunctionsV1 extends Worker
Console::error($th->getMessage());
}
$execution = Authorization::skip(fn () => $dbForProject->updateDocument('executions', $executionId, $execution));
/** @var Document $execution */
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution();

View file

@ -45,7 +45,7 @@
"utopia-php/cache": "0.6.*",
"utopia-php/cli": "0.12.*",
"utopia-php/config": "0.2.*",
"utopia-php/database": "dev-bug-last-internal-id as 0.18.1",
"utopia-php/database": "0.18.*",
"utopia-php/locale": "0.4.*",
"utopia-php/registry": "0.5.*",
"utopia-php/preloader": "0.2.*",

39
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "2fb5e41dc214b996d95f49a49d64d00e",
"content-hash": "1593c7304ba026d4073de336227858f3",
"packages": [
{
"name": "adhocore/jwt",
@ -1583,16 +1583,16 @@
},
{
"name": "squizlabs/php_codesniffer",
"version": "3.7.0",
"version": "3.7.1",
"source": {
"type": "git",
"url": "https://github.com/squizlabs/PHP_CodeSniffer.git",
"reference": "a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563"
"reference": "1359e176e9307e906dc3d890bcc9603ff6d90619"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/squizlabs/PHP_CodeSniffer/zipball/a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563",
"reference": "a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563",
"url": "https://api.github.com/repos/squizlabs/PHP_CodeSniffer/zipball/1359e176e9307e906dc3d890bcc9603ff6d90619",
"reference": "1359e176e9307e906dc3d890bcc9603ff6d90619",
"shasum": ""
},
"require": {
@ -1635,7 +1635,7 @@
"source": "https://github.com/squizlabs/PHP_CodeSniffer",
"wiki": "https://github.com/squizlabs/PHP_CodeSniffer/wiki"
},
"time": "2022-06-13T06:31:38+00:00"
"time": "2022-06-18T07:21:10+00:00"
},
{
"name": "symfony/deprecation-contracts",
@ -2107,16 +2107,16 @@
},
{
"name": "utopia-php/database",
"version": "dev-bug-last-internal-id",
"version": "0.18.2",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc"
"reference": "781c31238b03ebc530a225973c4d1a921e00c2b9"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc",
"reference": "ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc",
"url": "https://api.github.com/repos/utopia-php/database/zipball/781c31238b03ebc530a225973c4d1a921e00c2b9",
"reference": "781c31238b03ebc530a225973c4d1a921e00c2b9",
"shasum": ""
},
"require": {
@ -2165,9 +2165,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/bug-last-internal-id"
"source": "https://github.com/utopia-php/database/tree/0.18.2"
},
"time": "2022-06-14T12:42:10+00:00"
"time": "2022-06-19T09:32:07+00:00"
},
{
"name": "utopia-php/domains",
@ -5346,18 +5346,9 @@
"time": "2022-05-17T05:48:52+00:00"
}
],
"aliases": [
{
"package": "utopia-php/database",
"version": "dev-bug-last-internal-id",
"alias": "0.18.1",
"alias_normalized": "0.18.1.0"
}
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {
"utopia-php/database": 20
},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
@ -5379,5 +5370,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.2.0"
"plugin-api-version": "2.3.0"
}

View file

@ -17,6 +17,7 @@ use Utopia\Storage\Device\Wasabi;
use Utopia\Storage\Device\Backblaze;
use Utopia\Storage\Device\S3;
use Exception;
use Utopia\Database\Validator\Authorization;
abstract class Worker
{
@ -112,6 +113,11 @@ abstract class Worker
public function perform(): void
{
try {
/**
* Disabling global authorization in workers.
*/
Authorization::disable();
Authorization::setDefaultStatus(false);
$this->run();
} catch (\Throwable $error) {
foreach (self::$errorCallbacks as $errorCallback) {
@ -159,7 +165,16 @@ abstract class Worker
*/
protected function getProjectDB(string $projectId): Database
{
return $this->getDB(self::DATABASE_PROJECT, $projectId);
$consoleDB = $this->getConsoleDB();
if ($projectId === 'console') {
return $consoleDB;
}
/** @var Document $project */
$project = Authorization::skip(fn() => $consoleDB->getDocument('projects', $projectId));
return $this->getDB(self::DATABASE_PROJECT, $projectId, $project->getInternalId());
}
/**
@ -177,7 +192,7 @@ abstract class Worker
* @param string $projectId of internal or external DB
* @return Database
*/
private function getDB($type, $projectId = ''): Database
private function getDB(string $type, string $projectId = '', string $projectInternalId = ''): Database
{
global $register;
@ -189,7 +204,7 @@ abstract class Worker
if (!$projectId) {
throw new \Exception('ProjectID not provided - cannot get database');
}
$namespace = "_{$projectId}";
$namespace = "_{$projectInternalId}";
break;
case self::DATABASE_CONSOLE:
$namespace = "_console";