1
0
Fork 0
mirror of synced 2024-06-29 11:40:45 +12:00

Migrate Database worker to Utopia Queue system

This commit is contained in:
Bradley Schofield 2022-12-13 15:54:01 +00:00
parent 0c759e958d
commit 6e6a6d5518
5 changed files with 398 additions and 320 deletions

View file

@ -3,6 +3,7 @@
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/controllers/general.php';
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Func;
use Appwrite\Platform\Appwrite;
use Utopia\CLI\CLI;
@ -114,6 +115,10 @@ CLI::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
CLI::setResource('database', function (Group $pools) {
return new EventDatabase($pools->get('queue')->pop()->getResource());
}, ['pools']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -850,7 +850,9 @@ App::setResource('events', fn() => new Event('', ''));
App::setResource('audits', fn() => new Audit());
App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete());
App::setResource('database', fn() => new EventDatabase());
App::setResource('database', function (Group $pools) {
return new EventDatabase($pools->get('queue')->pop()->getResource());
}, ['pools']);
App::setResource('messaging', fn() => new Phone());
App::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());

View file

@ -2,6 +2,7 @@
require_once __DIR__ . '/init.php';
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Func;
use Swoole\Runtime;
use Utopia\App;
@ -75,6 +76,16 @@ Server::setResource('cache', function (Registry $register) {
return new Cache(new Sharding($adapters));
}, ['register']);
Server::setResource('database', function (Registry $register) {
$pools = $register->get('pools');
return new EventDatabase(
$pools
->get('queue')
->pop()
->getResource()
);
}, ['register']);
Server::setResource('queueForFunctions', function (Registry $register) {
$pools = $register->get('pools');
return new Func(

View file

@ -2,72 +2,90 @@
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Utopia\App;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Message;
use Utopia\Cache\Adapter\Redis as RedisCache;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Config\Config;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php';
use function Swoole\Coroutine\Http\get;
Console::title('Database V1 Worker');
Console::success(APP_NAME . ' database worker v1 has started' . "\n");
require_once __DIR__ . '/../worker.php';
class DatabaseV1 extends Worker
Authorization::disable();
Authorization::setDefaultStatus(false);
const DATABASE_PROJECT = 'project';
const DATABASE_CONSOLE = 'console';
function getCache(): Cache
{
public function init(): void
{
global $register;
$pools = $register->get('pools');
/** @var \Utopia\Pools\Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource();
}
public function run(): void
{
$type = $this->args['type'];
$project = new Document($this->args['project']);
$collection = new Document($this->args['collection'] ?? []);
$document = new Document($this->args['document'] ?? []);
$database = new Document($this->args['database'] ?? []);
return new Cache(new Sharding($adapters));
}
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
/**
* Get Project DB
*
* @param Document $project
* @returns Database
*/
function getProjectDB(Document $project): Database
{
global $register;
/** @var \Utopia\Pools\Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
if ($document->isEmpty()) {
throw new Exception('Missing document');
}
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
switch (strval($type)) {
case DATABASE_TYPE_CREATE_ATTRIBUTE:
$this->createAttribute($database, $collection, $document, $project);
break;
case DATABASE_TYPE_DELETE_ATTRIBUTE:
$this->deleteAttribute($database, $collection, $document, $project);
break;
case DATABASE_TYPE_CREATE_INDEX:
$this->createIndex($database, $collection, $document, $project);
break;
case DATABASE_TYPE_DELETE_INDEX:
$this->deleteIndex($database, $collection, $document, $project);
break;
$database = new Database($dbAdapter, getCache());
$database->setNamespace('_' . $project->getInternalId());
default:
Console::error('No database operation for type: ' . $type);
break;
}
}
return $database;
}
public function shutdown(): void
{
}
/**
/**
* @param Document $database
* @param Document $collection
* @param Document $attribute
* @param Database $dbForConsole
*
* @param Document $project
*/
protected function createAttribute(Document $database, Document $collection, Document $attribute, Document $project): void
{
$projectId = $project->getId();
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($project);
function createDBAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole): void
{
$dbForProject = getProjectDB($project);
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [
'databaseId' => $database->getId(),
@ -90,7 +108,6 @@ class DatabaseV1 extends Worker
$format = $attribute->getAttribute('format', '');
$formatOptions = $attribute->getAttribute('formatOptions', []);
$filters = $attribute->getAttribute('filters', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
@ -115,7 +132,7 @@ class DatabaseV1 extends Worker
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'projectId' => $project->getId(),
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
@ -123,19 +140,18 @@ class DatabaseV1 extends Worker
}
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
}
}
/**
/**
* @param Document $database
* @param Document $collection
* @param Document $attribute
* @param Database $dbForConsole
* @param Document $project
*/
protected function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project): void
{
$projectId = $project->getId();
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($project);
function deleteDBAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole): void
{
$dbForProject = getProjectDB($project);
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [
'databaseId' => $database->getId(),
@ -145,7 +161,6 @@ class DatabaseV1 extends Worker
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$status = $attribute->getAttribute('status', '');
$project = $dbForConsole->getDocument('projects', $projectId);
// possible states at this point:
// - available: should not land in queue; controller flips these to 'deleting'
@ -176,7 +191,7 @@ class DatabaseV1 extends Worker
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'projectId' => $project->getId(),
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
@ -210,8 +225,7 @@ class DatabaseV1 extends Worker
$index
->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN)
->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN)
->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN)
;
->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN);
// Check if an index exists with the same attributes and orders
$exists = false;
@ -227,7 +241,7 @@ class DatabaseV1 extends Worker
}
if ($exists) { // Delete the duplicate if created, else update in db
$this->deleteIndex($database, $collection, $index, $project);
deleteIndex($database, $collection, $index, $project, $dbForConsole);
} else {
$dbForProject->updateDocument('indexes', $index->getId(), $index);
}
@ -237,19 +251,18 @@ class DatabaseV1 extends Worker
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
$dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
}
}
/**
/**
* @param Document $database
* @param Document $collection
* @param Document $index
* @param Database $dbForConsole
* @param Document $project
*/
protected function createIndex(Document $database, Document $collection, Document $index, Document $project): void
{
$projectId = $project->getId();
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($project);
function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole): void
{
$dbForProject = getProjectDB($project);
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [
'databaseId' => $database->getId(),
@ -262,7 +275,6 @@ class DatabaseV1 extends Worker
$attributes = $index->getAttribute('attributes', []);
$lengths = $index->getAttribute('lengths', []);
$orders = $index->getAttribute('orders', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) {
@ -287,7 +299,7 @@ class DatabaseV1 extends Worker
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'projectId' => $project->getId(),
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
@ -295,19 +307,18 @@ class DatabaseV1 extends Worker
}
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
}
}
/**
/**
* @param Document $database
* @param Document $collection
* @param Document $index
* @param Database $dbForConsole
* @param Document $project
*/
protected function deleteIndex(Document $database, Document $collection, Document $index, Document $project): void
{
$projectId = $project->getId();
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($project);
function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole): void
{
$dbForProject = getProjectDB($project);
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [
'databaseId' => $database->getId(),
@ -316,7 +327,6 @@ class DatabaseV1 extends Worker
]);
$key = $index->getAttribute('key');
$status = $index->getAttribute('status', '');
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
@ -341,7 +351,7 @@ class DatabaseV1 extends Worker
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'projectId' => $project->getId(),
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
@ -349,5 +359,51 @@ class DatabaseV1 extends Worker
}
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId());
}
}
$server->job()
->inject('message')
->inject('dbForProject')
->action(function (Message $message, Database $dbForProject) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$type = $payload['type'];
$project = new Document($payload['project']);
$collection = new Document($payload['collection'] ?? []);
$document = new Document($payload['document'] ?? []);
$database = new Document($payload['database'] ?? []);
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
}
if ($document->isEmpty()) {
throw new Exception('Missing document');
}
switch (strval($type)) {
case DATABASE_TYPE_CREATE_ATTRIBUTE:
createDBAttribute($database, $collection, $document, $project, $dbForProject);
break;
case DATABASE_TYPE_DELETE_ATTRIBUTE:
deleteDBAttribute($database, $collection, $document, $project, $dbForProject);
break;
case DATABASE_TYPE_CREATE_INDEX:
createIndex($database, $collection, $document, $project, $dbForProject);
break;
case DATABASE_TYPE_DELETE_INDEX:
deleteIndex($database, $collection, $document, $project, $dbForProject);
break;
default:
Console::error('No database operation for type: ' . $type);
break;
}
});
$server->workerStart();
$server->start();

View file

@ -4,6 +4,8 @@ namespace Appwrite\Event;
use Resque;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Database extends Event
{
@ -12,7 +14,7 @@ class Database extends Event
protected ?Document $collection = null;
protected ?Document $document = null;
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::DATABASE_QUEUE_NAME, Event::DATABASE_CLASS_NAME);
}
@ -104,7 +106,9 @@ class Database extends Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'type' => $this->type,