diff --git a/app/cli.php b/app/cli.php index 13709b9b5..d0e3c951b 100644 --- a/app/cli.php +++ b/app/cli.php @@ -3,6 +3,7 @@ require_once __DIR__ . '/init.php'; require_once __DIR__ . '/controllers/general.php'; +use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Func; use Appwrite\Platform\Appwrite; use Utopia\CLI\CLI; @@ -114,6 +115,10 @@ CLI::setResource('queueForFunctions', function (Group $pools) { return new Func($pools->get('queue')->pop()->getResource()); }, ['pools']); +CLI::setResource('database', function (Group $pools) { + return new EventDatabase($pools->get('queue')->pop()->getResource()); +}, ['pools']); + CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/init.php b/app/init.php index 31f82740b..7463cf3b0 100644 --- a/app/init.php +++ b/app/init.php @@ -850,7 +850,9 @@ App::setResource('events', fn() => new Event('', '')); App::setResource('audits', fn() => new Audit()); App::setResource('mails', fn() => new Mail()); App::setResource('deletes', fn() => new Delete()); -App::setResource('database', fn() => new EventDatabase()); +App::setResource('database', function (Group $pools) { + return new EventDatabase($pools->get('queue')->pop()->getResource()); +}, ['pools']); App::setResource('messaging', fn() => new Phone()); App::setResource('queueForFunctions', function (Group $pools) { return new Func($pools->get('queue')->pop()->getResource()); diff --git a/app/worker.php b/app/worker.php index 42a5f9243..259a2d8c6 100644 --- a/app/worker.php +++ b/app/worker.php @@ -2,6 +2,7 @@ require_once __DIR__ . '/init.php'; +use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Func; use Swoole\Runtime; use Utopia\App; @@ -75,6 +76,16 @@ Server::setResource('cache', function (Registry $register) { return new Cache(new Sharding($adapters)); }, ['register']); +Server::setResource('database', function (Registry $register) { + $pools = $register->get('pools'); + return new EventDatabase( + $pools + ->get('queue') + ->pop() + ->getResource() + ); +}, ['register']); + Server::setResource('queueForFunctions', function (Registry $register) { $pools = $register->get('pools'); return new Func( diff --git a/app/workers/databases.php b/app/workers/databases.php index fece5bc60..e5686203e 100644 --- a/app/workers/databases.php +++ b/app/workers/databases.php @@ -2,28 +2,380 @@ use Appwrite\Event\Event; use Appwrite\Messaging\Adapter\Realtime; -use Appwrite\Resque\Worker; +use Utopia\App; +use Utopia\Cache\Cache; use Utopia\CLI\Console; +use Utopia\Database\Database; use Utopia\Database\Document; +use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Message; +use Utopia\Cache\Adapter\Redis as RedisCache; +use Utopia\Cache\Adapter\Sharding; +use Utopia\Config\Config; +use Utopia\Database\Adapter\MariaDB; +use Utopia\Queue\Server; -require_once __DIR__ . '/../init.php'; +use function Swoole\Coroutine\Http\get; -Console::title('Database V1 Worker'); -Console::success(APP_NAME . ' database worker v1 has started' . "\n"); +require_once __DIR__ . '/../worker.php'; -class DatabaseV1 extends Worker +Authorization::disable(); +Authorization::setDefaultStatus(false); + +const DATABASE_PROJECT = 'project'; +const DATABASE_CONSOLE = 'console'; + +function getCache(): Cache { - public function init(): void - { + global $register; + + $pools = $register->get('pools'); + /** @var \Utopia\Pools\Group $pools */ + + $list = Config::getParam('pools-cache', []); + $adapters = []; + + foreach ($list as $value) { + $adapters[] = $pools + ->get($value) + ->pop() + ->getResource(); } - public function run(): void - { - $type = $this->args['type']; - $project = new Document($this->args['project']); - $collection = new Document($this->args['collection'] ?? []); - $document = new Document($this->args['document'] ?? []); - $database = new Document($this->args['database'] ?? []); + return new Cache(new Sharding($adapters)); +} + +/** + * Get Project DB + * + * @param Document $project + * @returns Database + */ +function getProjectDB(Document $project): Database +{ + global $register; + + /** @var \Utopia\Pools\Group $pools */ + $pools = $register->get('pools'); + + if ($project->isEmpty() || $project->getId() === 'console') { + return getConsoleDB(); + } + + $dbAdapter = $pools + ->get($project->getAttribute('database')) + ->pop() + ->getResource() + ; + + $database = new Database($dbAdapter, getCache()); + $database->setNamespace('_' . $project->getInternalId()); + + return $database; +} + + +/** + * @param Document $database + * @param Document $collection + * @param Document $attribute + * @param Database $dbForConsole + * + * @param Document $project + */ +function createDBAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole): void +{ + $dbForProject = getProjectDB($project); + + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'attributeId' => $attribute->getId() + ]); + /** + * Fetch attribute from the database, since with Resque float values are loosing informations. + */ + $attribute = $dbForProject->getDocument('attributes', $attribute->getId()); + + $collectionId = $collection->getId(); + $key = $attribute->getAttribute('key', ''); + $type = $attribute->getAttribute('type', ''); + $size = $attribute->getAttribute('size', 0); + $required = $attribute->getAttribute('required', false); + $default = $attribute->getAttribute('default', null); + $signed = $attribute->getAttribute('signed', true); + $array = $attribute->getAttribute('array', false); + $format = $attribute->getAttribute('format', ''); + $formatOptions = $attribute->getAttribute('formatOptions', []); + $filters = $attribute->getAttribute('filters', []); + + try { + if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) { + throw new Exception('Failed to create Attribute'); + } + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available')); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'failed')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $attribute, + project: $project, + ); + + Realtime::send( + projectId: 'console', + payload: $attribute->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); +} + +/** + * @param Document $database + * @param Document $collection + * @param Document $attribute + * @param Database $dbForConsole + * @param Document $project + */ +function deleteDBAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole): void +{ + $dbForProject = getProjectDB($project); + + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'attributeId' => $attribute->getId() + ]); + $collectionId = $collection->getId(); + $key = $attribute->getAttribute('key', ''); + $status = $attribute->getAttribute('status', ''); + + // possible states at this point: + // - available: should not land in queue; controller flips these to 'deleting' + // - processing: hasn't finished creating + // - deleting: was available, in deletion queue for first time + // - failed: attribute was never created + // - stuck: attribute was available but cannot be removed + try { + if ($status !== 'failed' && !$dbForProject->deleteAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { + throw new Exception('Failed to delete Attribute'); + } + $dbForProject->deleteDocument('attributes', $attribute->getId()); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'stuck')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $attribute, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $attribute->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + // The underlying database removes/rebuilds indexes when attribute is removed + // Update indexes table with changes + /** @var Document[] $indexes */ + $indexes = $collection->getAttribute('indexes', []); + + foreach ($indexes as $index) { + /** @var string[] $attributes */ + $attributes = $index->getAttribute('attributes'); + $lengths = $index->getAttribute('lengths'); + $orders = $index->getAttribute('orders'); + + $found = \array_search($key, $attributes); + + if ($found !== false) { + // If found, remove entry from attributes, lengths, and orders + // array_values wraps array_diff to reindex array keys + // when found attribute is removed from array + $attributes = \array_values(\array_diff($attributes, [$attributes[$found]])); + $lengths = \array_values(\array_diff($lengths, [$lengths[$found]])); + $orders = \array_values(\array_diff($orders, [$orders[$found]])); + + if (empty($attributes)) { + $dbForProject->deleteDocument('indexes', $index->getId()); + } else { + $index + ->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN) + ->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN) + ->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN); + + // Check if an index exists with the same attributes and orders + $exists = false; + foreach ($indexes as $existing) { + if ( + $existing->getAttribute('key') !== $index->getAttribute('key') // Ignore itself + && $existing->getAttribute('attributes') === $index->getAttribute('attributes') + && $existing->getAttribute('orders') === $index->getAttribute('orders') + ) { + $exists = true; + break; + } + } + + if ($exists) { // Delete the duplicate if created, else update in db + deleteIndex($database, $collection, $index, $project, $dbForConsole); + } else { + $dbForProject->updateDocument('indexes', $index->getId(), $index); + } + } + } + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); + $dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); +} + +/** + * @param Document $database + * @param Document $collection + * @param Document $index + * @param Database $dbForConsole + * @param Document $project + */ +function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole): void +{ + $dbForProject = getProjectDB($project); + + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'indexId' => $index->getId() + ]); + $collectionId = $collection->getId(); + $key = $index->getAttribute('key', ''); + $type = $index->getAttribute('type', ''); + $attributes = $index->getAttribute('attributes', []); + $lengths = $index->getAttribute('lengths', []); + $orders = $index->getAttribute('orders', []); + + try { + if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) { + throw new Exception('Failed to create Index'); + } + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available')); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'failed')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $index, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $index->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); +} + +/** + * @param Document $database + * @param Document $collection + * @param Document $index + * @param Database $dbForConsole + * @param Document $project + */ +function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole): void +{ + $dbForProject = getProjectDB($project); + + $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [ + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId(), + 'indexId' => $index->getId() + ]); + $key = $index->getAttribute('key'); + $status = $index->getAttribute('status', ''); + + try { + if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { + throw new Exception('Failed to delete index'); + } + $dbForProject->deleteDocument('indexes', $index->getId()); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'stuck')); + } finally { + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $events[0], + payload: $index, + project: $project + ); + + Realtime::send( + projectId: 'console', + payload: $index->getArrayCopy(), + events: $events, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'projectId' => $project->getId(), + 'databaseId' => $database->getId(), + 'collectionId' => $collection->getId() + ] + ); + } + + $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId()); +} + +$server->job() + ->inject('message') + ->inject('dbForProject') + ->action(function (Message $message, Database $dbForProject) { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type']; + $project = new Document($payload['project']); + $collection = new Document($payload['collection'] ?? []); + $document = new Document($payload['document'] ?? []); + $database = new Document($payload['database'] ?? []); if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -35,319 +387,23 @@ class DatabaseV1 extends Worker switch (strval($type)) { case DATABASE_TYPE_CREATE_ATTRIBUTE: - $this->createAttribute($database, $collection, $document, $project); + createDBAttribute($database, $collection, $document, $project, $dbForProject); break; case DATABASE_TYPE_DELETE_ATTRIBUTE: - $this->deleteAttribute($database, $collection, $document, $project); + deleteDBAttribute($database, $collection, $document, $project, $dbForProject); break; case DATABASE_TYPE_CREATE_INDEX: - $this->createIndex($database, $collection, $document, $project); + createIndex($database, $collection, $document, $project, $dbForProject); break; case DATABASE_TYPE_DELETE_INDEX: - $this->deleteIndex($database, $collection, $document, $project); + deleteIndex($database, $collection, $document, $project, $dbForProject); break; default: Console::error('No database operation for type: ' . $type); break; } - } + }); - public function shutdown(): void - { - } - - /** - * @param Document $database - * @param Document $collection - * @param Document $attribute - * @param Document $project - */ - protected function createAttribute(Document $database, Document $collection, Document $attribute, Document $project): void - { - $projectId = $project->getId(); - $dbForConsole = $this->getConsoleDB(); - $dbForProject = $this->getProjectDB($project); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); - /** - * Fetch attribute from the database, since with Resque float values are loosing informations. - */ - $attribute = $dbForProject->getDocument('attributes', $attribute->getId()); - - $collectionId = $collection->getId(); - $key = $attribute->getAttribute('key', ''); - $type = $attribute->getAttribute('type', ''); - $size = $attribute->getAttribute('size', 0); - $required = $attribute->getAttribute('required', false); - $default = $attribute->getAttribute('default', null); - $signed = $attribute->getAttribute('signed', true); - $array = $attribute->getAttribute('array', false); - $format = $attribute->getAttribute('format', ''); - $formatOptions = $attribute->getAttribute('formatOptions', []); - $filters = $attribute->getAttribute('filters', []); - $project = $dbForConsole->getDocument('projects', $projectId); - - try { - if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) { - throw new Exception('Failed to create Attribute'); - } - $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available')); - } catch (\Throwable $th) { - Console::error($th->getMessage()); - $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'failed')); - } finally { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $attribute, - project: $project, - ); - - Realtime::send( - projectId: 'console', - payload: $attribute->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); - } - - $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); - } - - /** - * @param Document $database - * @param Document $collection - * @param Document $attribute - * @param Document $project - */ - protected function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project): void - { - $projectId = $project->getId(); - $dbForConsole = $this->getConsoleDB(); - $dbForProject = $this->getProjectDB($project); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); - $collectionId = $collection->getId(); - $key = $attribute->getAttribute('key', ''); - $status = $attribute->getAttribute('status', ''); - $project = $dbForConsole->getDocument('projects', $projectId); - - // possible states at this point: - // - available: should not land in queue; controller flips these to 'deleting' - // - processing: hasn't finished creating - // - deleting: was available, in deletion queue for first time - // - failed: attribute was never created - // - stuck: attribute was available but cannot be removed - try { - if ($status !== 'failed' && !$dbForProject->deleteAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { - throw new Exception('Failed to delete Attribute'); - } - $dbForProject->deleteDocument('attributes', $attribute->getId()); - } catch (\Throwable $th) { - Console::error($th->getMessage()); - $dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'stuck')); - } finally { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $attribute, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $attribute->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); - } - - // The underlying database removes/rebuilds indexes when attribute is removed - // Update indexes table with changes - /** @var Document[] $indexes */ - $indexes = $collection->getAttribute('indexes', []); - - foreach ($indexes as $index) { - /** @var string[] $attributes */ - $attributes = $index->getAttribute('attributes'); - $lengths = $index->getAttribute('lengths'); - $orders = $index->getAttribute('orders'); - - $found = \array_search($key, $attributes); - - if ($found !== false) { - // If found, remove entry from attributes, lengths, and orders - // array_values wraps array_diff to reindex array keys - // when found attribute is removed from array - $attributes = \array_values(\array_diff($attributes, [$attributes[$found]])); - $lengths = \array_values(\array_diff($lengths, [$lengths[$found]])); - $orders = \array_values(\array_diff($orders, [$orders[$found]])); - - if (empty($attributes)) { - $dbForProject->deleteDocument('indexes', $index->getId()); - } else { - $index - ->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN) - ->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN) - ->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN) - ; - - // Check if an index exists with the same attributes and orders - $exists = false; - foreach ($indexes as $existing) { - if ( - $existing->getAttribute('key') !== $index->getAttribute('key') // Ignore itself - && $existing->getAttribute('attributes') === $index->getAttribute('attributes') - && $existing->getAttribute('orders') === $index->getAttribute('orders') - ) { - $exists = true; - break; - } - } - - if ($exists) { // Delete the duplicate if created, else update in db - $this->deleteIndex($database, $collection, $index, $project); - } else { - $dbForProject->updateDocument('indexes', $index->getId(), $index); - } - } - } - } - - $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); - $dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); - } - - /** - * @param Document $database - * @param Document $collection - * @param Document $index - * @param Document $project - */ - protected function createIndex(Document $database, Document $collection, Document $index, Document $project): void - { - $projectId = $project->getId(); - $dbForConsole = $this->getConsoleDB(); - $dbForProject = $this->getProjectDB($project); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); - $collectionId = $collection->getId(); - $key = $index->getAttribute('key', ''); - $type = $index->getAttribute('type', ''); - $attributes = $index->getAttribute('attributes', []); - $lengths = $index->getAttribute('lengths', []); - $orders = $index->getAttribute('orders', []); - $project = $dbForConsole->getDocument('projects', $projectId); - - try { - if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) { - throw new Exception('Failed to create Index'); - } - $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available')); - } catch (\Throwable $th) { - Console::error($th->getMessage()); - $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'failed')); - } finally { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $index, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $index->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); - } - - $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId); - } - - /** - * @param Document $database - * @param Document $collection - * @param Document $index - * @param Document $project - */ - protected function deleteIndex(Document $database, Document $collection, Document $index, Document $project): void - { - $projectId = $project->getId(); - $dbForConsole = $this->getConsoleDB(); - $dbForProject = $this->getProjectDB($project); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); - $key = $index->getAttribute('key'); - $status = $index->getAttribute('status', ''); - $project = $dbForConsole->getDocument('projects', $projectId); - - try { - if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) { - throw new Exception('Failed to delete index'); - } - $dbForProject->deleteDocument('indexes', $index->getId()); - } catch (\Throwable $th) { - Console::error($th->getMessage()); - $dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'stuck')); - } finally { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $index, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $index->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); - } - - $dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId()); - } -} +$server->workerStart(); +$server->start(); diff --git a/src/Appwrite/Event/Database.php b/src/Appwrite/Event/Database.php index 1822f06c7..2c6aaea92 100644 --- a/src/Appwrite/Event/Database.php +++ b/src/Appwrite/Event/Database.php @@ -4,6 +4,8 @@ namespace Appwrite\Event; use Resque; use Utopia\Database\Document; +use Utopia\Queue\Client; +use Utopia\Queue\Connection; class Database extends Event { @@ -12,7 +14,7 @@ class Database extends Event protected ?Document $collection = null; protected ?Document $document = null; - public function __construct() + public function __construct(protected Connection $connection) { parent::__construct(Event::DATABASE_QUEUE_NAME, Event::DATABASE_CLASS_NAME); } @@ -104,7 +106,9 @@ class Database extends Event */ public function trigger(): string|bool { - return Resque::enqueue($this->queue, $this->class, [ + $client = new Client($this->queue, $this->connection); + + return $client->enqueue([ 'project' => $this->project, 'user' => $this->user, 'type' => $this->type,