1
0
Fork 0
mirror of synced 2024-06-01 10:29:48 +12:00

migrations worker

This commit is contained in:
shimon 2023-10-01 20:39:26 +03:00
parent 47d27096db
commit b02d51c794
24 changed files with 683 additions and 137 deletions

2
.gitmodules vendored
View file

@ -1,4 +1,4 @@
[submodule "app/console"] [submodule "app/console"]
path = app/console path = app/console
url = https://github.com/appwrite/console url = https://github.com/appwrite/console
branch = 3.1.1 branch = feat-usage-1.4

View file

@ -76,7 +76,7 @@ App::post('/v1/account')
->inject('project') ->inject('project')
->inject('dbForProject') ->inject('dbForProject')
->inject('queueForEvents') ->inject('queueForEvents')
->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $project, Database $dbForProject, Event $queueForEvents) { ->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents) {
$email = \strtolower($email); $email = \strtolower($email);
if ('console' === $project->getId()) { if ('console' === $project->getId()) {

View file

@ -22,6 +22,7 @@ use Utopia\Database\Database;
use Utopia\Database\DateTime; use Utopia\Database\DateTime;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization as AuthorizationException; use Utopia\Database\Exception\Authorization as AuthorizationException;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Exception\Duplicate as DuplicateException;
use Utopia\Database\Exception\Limit as LimitException; use Utopia\Database\Exception\Limit as LimitException;
use Utopia\Database\Exception\Restricted as RestrictedException; use Utopia\Database\Exception\Restricted as RestrictedException;
@ -57,13 +58,26 @@ use Utopia\Validator\URL;
use Utopia\Validator\WhiteList; use Utopia\Validator\WhiteList;
/** /**
* Create attribute of varying type * * Create attribute of varying type
*
* *
* @param string $databaseId
* @param string $collectionId
* @param Document $attribute
* @param Response $response
* @param Database $dbForProject
* @param EventDatabase $queueForDatabase
* @param Event $queueForEvents
* @return Document Newly created attribute document * @return Document Newly created attribute document
* @throws AuthorizationException
* @throws Exception * @throws Exception
* @throws LimitException
* @throws RestrictedException
* @throws StructureException
* @throws \Utopia\Database\Exception
* @throws Conflict
* @throws \Utopia\Exception
*/ */
function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $events): Document function createAttribute(string $databaseId, string $collectionId, Document $attribute, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents): Document
{ {
$key = $attribute->getAttribute('key'); $key = $attribute->getAttribute('key');
$type = $attribute->getAttribute('type', ''); $type = $attribute->getAttribute('type', '');
@ -1529,7 +1543,7 @@ App::post('/v1/databases/:databaseId/collections/:collectionId/attributes/dateti
'default' => $default, 'default' => $default,
'array' => $array, 'array' => $array,
'filters' => $filters, 'filters' => $filters,
]), $response, $dbForProject, $database, $events); ]), $response, $dbForProject, $queueForDatabase, $queueForEvents);
$response $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED) ->setStatusCode(Response::STATUS_CODE_ACCEPTED)

View file

@ -285,7 +285,12 @@ App::post('/v1/functions')
/** Trigger Webhook */ /** Trigger Webhook */
$ruleModel = new Rule(); $ruleModel = new Rule();
$ruleCreate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); $ruleCreate =
$queueForEvents
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME)
;
$ruleCreate $ruleCreate
->setProject($project) ->setProject($project)
->setEvent('rules.[ruleId].create') ->setEvent('rules.[ruleId].create')
@ -999,7 +1004,9 @@ App::post('/v1/functions/:functionId/deployments')
->inject('deviceLocal') ->inject('deviceLocal')
->inject('dbForConsole') ->inject('dbForConsole')
->inject('queueForBuilds') ->inject('queueForBuilds')
->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $queueForEvents, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole, Build $queueForBuilds) { ->action(function (string $functionId, string $entrypoint, ?string $commands, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $queueForEvents, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole, Build $queueForBuilds) {
$activate = filter_var($activate, FILTER_VALIDATE_BOOLEAN);
$function = $dbForProject->getDocument('functions', $functionId); $function = $dbForProject->getDocument('functions', $functionId);

View file

@ -51,8 +51,9 @@ App::post('/v1/migrations/appwrite')
->inject('dbForProject') ->inject('dbForProject')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForEvents')
->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, Document $user, Event $events) { ->inject('queueForMigrations')
->action(function (array $resources, string $endpoint, string $projectId, string $apiKey, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Migration $queueForMigrations) {
$migration = $dbForProject->createDocument('migrations', new Document([ $migration = $dbForProject->createDocument('migrations', new Document([
'$id' => ID::unique(), '$id' => ID::unique(),
'status' => 'pending', 'status' => 'pending',
@ -69,11 +70,10 @@ App::post('/v1/migrations/appwrite')
'errors' => [], 'errors' => [],
])); ]));
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
// Trigger Transfer // Trigger Transfer
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -104,9 +104,10 @@ App::post('/v1/migrations/firebase/oauth')
->inject('dbForConsole') ->inject('dbForConsole')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForEvents')
->inject('queueForMigrations')
->inject('request') ->inject('request')
->action(function (array $resources, string $projectId, Response $response, Database $dbForProject, Database $dbForConsole, Document $project, Document $user, Event $events, Request $request) { ->action(function (array $resources, string $projectId, Response $response, Database $dbForProject, Database $dbForConsole, Document $project, Document $user, Event $queueForEvents, Migration $queueForMigrations, Request $request) {
$firebase = new OAuth2Firebase( $firebase = new OAuth2Firebase(
App::getEnv('_APP_MIGRATIONS_FIREBASE_CLIENT_ID', ''), App::getEnv('_APP_MIGRATIONS_FIREBASE_CLIENT_ID', ''),
App::getEnv('_APP_MIGRATIONS_FIREBASE_CLIENT_SECRET', ''), App::getEnv('_APP_MIGRATIONS_FIREBASE_CLIENT_SECRET', ''),
@ -171,11 +172,10 @@ App::post('/v1/migrations/firebase/oauth')
'errors' => [] 'errors' => []
])); ]));
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
// Trigger Transfer // Trigger Transfer
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -205,8 +205,9 @@ App::post('/v1/migrations/firebase')
->inject('dbForProject') ->inject('dbForProject')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForEvents')
->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, Document $user, Event $events) { ->inject('queueForMigrations')
->action(function (array $resources, string $serviceAccount, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Migration $queueForMigrations) {
$migration = $dbForProject->createDocument('migrations', new Document([ $migration = $dbForProject->createDocument('migrations', new Document([
'$id' => ID::unique(), '$id' => ID::unique(),
'status' => 'pending', 'status' => 'pending',
@ -221,11 +222,10 @@ App::post('/v1/migrations/firebase')
'errors' => [], 'errors' => [],
])); ]));
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
// Trigger Transfer // Trigger Transfer
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -260,8 +260,9 @@ App::post('/v1/migrations/supabase')
->inject('dbForProject') ->inject('dbForProject')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForEvents')
->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, Document $user, Event $events) { ->inject('queueForMigrations')
->action(function (array $resources, string $endpoint, string $apiKey, string $databaseHost, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Migration $queueForMigrations) {
$migration = $dbForProject->createDocument('migrations', new Document([ $migration = $dbForProject->createDocument('migrations', new Document([
'$id' => ID::unique(), '$id' => ID::unique(),
'status' => 'pending', 'status' => 'pending',
@ -281,11 +282,10 @@ App::post('/v1/migrations/supabase')
'errors' => [], 'errors' => [],
])); ]));
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
// Trigger Transfer // Trigger Transfer
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -321,8 +321,9 @@ App::post('/v1/migrations/nhost')
->inject('dbForProject') ->inject('dbForProject')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForEvents')
->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, Document $user, Event $events) { ->inject('queueForMigrations')
->action(function (array $resources, string $subdomain, string $region, string $adminSecret, string $database, string $username, string $password, int $port, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Migration $queueForMigrations) {
$migration = $dbForProject->createDocument('migrations', new Document([ $migration = $dbForProject->createDocument('migrations', new Document([
'$id' => ID::unique(), '$id' => ID::unique(),
'status' => 'pending', 'status' => 'pending',
@ -343,11 +344,10 @@ App::post('/v1/migrations/nhost')
'errors' => [], 'errors' => [],
])); ]));
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
// Trigger Transfer // Trigger Transfer
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -931,8 +931,8 @@ App::patch('/v1/migrations/:migrationId')
->inject('dbForProject') ->inject('dbForProject')
->inject('project') ->inject('project')
->inject('user') ->inject('user')
->inject('events') ->inject('queueForMigrations')
->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventInstance) { ->action(function (string $migrationId, Response $response, Database $dbForProject, Document $project, Document $user, Migration $queueForMigrations) {
$migration = $dbForProject->getDocument('migrations', $migrationId); $migration = $dbForProject->getDocument('migrations', $migrationId);
if ($migration->isEmpty()) { if ($migration->isEmpty()) {
@ -948,8 +948,7 @@ App::patch('/v1/migrations/:migrationId')
->setAttribute('dateUpdated', \time()); ->setAttribute('dateUpdated', \time());
// Trigger Migration // Trigger Migration
$event = new Migration(); $queueForMigrations
$event
->setMigration($migration) ->setMigration($migration)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
@ -974,8 +973,8 @@ App::delete('/v1/migrations/:migrationId')
->param('migrationId', '', new UID(), 'Migration ID.') ->param('migrationId', '', new UID(), 'Migration ID.')
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->inject('events') ->inject('queueForEvents')
->action(function (string $migrationId, Response $response, Database $dbForProject, Event $events) { ->action(function (string $migrationId, Response $response, Database $dbForProject, Event $queueForEvents) {
$migration = $dbForProject->getDocument('migrations', $migrationId); $migration = $dbForProject->getDocument('migrations', $migrationId);
if ($migration->isEmpty()) { if ($migration->isEmpty()) {
@ -986,7 +985,7 @@ App::delete('/v1/migrations/:migrationId')
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove migration from DB'); throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove migration from DB');
} }
$events->setParam('migrationId', $migration->getId()); $queueForEvents->setParam('migrationId', $migration->getId());
$response->noContent(); $response->noContent();
}); });

View file

@ -388,7 +388,9 @@ App::post('/v1/teams/:teamId/memberships')
->inject('queueForMails') ->inject('queueForMails')
->inject('queueForMessaging') ->inject('queueForMessaging')
->inject('queueForEvents') ->inject('queueForEvents')
->action(function (string $teamId, string $email, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, EventPhone $queueForMessaging, Event $queueForEvents) { ->action(function (string $teamId, string $email, string $userId, string $phone, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, EventPhone $queueForMessaging, Event $queueForEvents) {
$isAPIKey = Auth::isAppUser(Authorization::getRoles());
$isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles());
if (empty($url)) { if (empty($url)) {
if (!$isAPIKey && !$isPrivilegedUser) { if (!$isAPIKey && !$isPrivilegedUser) {
@ -574,7 +576,7 @@ App::post('/v1/teams/:teamId/memberships')
$replyTo = $smtp['replyTo']; $replyTo = $smtp['replyTo'];
} }
$mails $queueForMails
->setSmtpHost($smtp['host'] ?? '') ->setSmtpHost($smtp['host'] ?? '')
->setSmtpPort($smtp['port'] ?? '') ->setSmtpPort($smtp['port'] ?? '')
->setSmtpUsername($smtp['username'] ?? '') ->setSmtpUsername($smtp['username'] ?? '')
@ -1094,7 +1096,6 @@ App::get('/v1/teams/:teamId/logs')
$audit = new Audit($dbForProject); $audit = new Audit($dbForProject);
$resource = 'team/' . $team->getId(); $resource = 'team/' . $team->getId();
$logs = $audit->getLogsByResource($resource, $limit, $offset); $logs = $audit->getLogsByResource($resource, $limit, $offset);
$output = []; $output = [];
foreach ($logs as $i => &$log) { foreach ($logs as $i => &$log) {

View file

@ -249,8 +249,8 @@ App::init()
$queueForDatabase->setProject($project); $queueForDatabase->setProject($project);
$dbForProject $dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject))
->on(Database::EVENT_DOCUMENT_DELETE, fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject))
; ;
$useCache = $route->getLabel('cache', false); $useCache = $route->getLabel('cache', false);
@ -427,7 +427,6 @@ App::shutdown()
$responsePayload = $response->getPayload(); $responsePayload = $response->getPayload();
if (!empty($queueForEvents->getEvent())) { if (!empty($queueForEvents->getEvent())) {
if (empty($queueForEvents->getPayload())) { if (empty($queueForEvents->getPayload())) {
$queueForEvents->setPayload($responsePayload); $queueForEvents->setPayload($responsePayload);
} }
@ -497,7 +496,7 @@ App::shutdown()
} }
if (!$user->isEmpty()) { if (!$user->isEmpty()) {
$audits->setUser($user); $queueForAudits->setUser($user);
} }
if (!empty($queueForAudits->getResource()) && !empty($queueForAudits->getUser()->getId())) { if (!empty($queueForAudits->getResource()) && !empty($queueForAudits->getUser()->getId())) {

View file

@ -18,6 +18,7 @@ ini_set('display_startup_errors', 1);
ini_set('default_socket_timeout', -1); ini_set('default_socket_timeout', -1);
error_reporting(E_ALL); error_reporting(E_ALL);
use Appwrite\Event\Migration;
use Appwrite\Event\Usage; use Appwrite\Event\Usage;
use Appwrite\Extend\Exception; use Appwrite\Extend\Exception;
use Appwrite\Auth\Auth; use Appwrite\Auth\Auth;
@ -53,6 +54,7 @@ use Utopia\Messaging\Adapters\SMS\Telesign;
use Utopia\Messaging\Adapters\SMS\TextMagic; use Utopia\Messaging\Adapters\SMS\TextMagic;
use Utopia\Messaging\Adapters\SMS\Twilio; use Utopia\Messaging\Adapters\SMS\Twilio;
use Utopia\Messaging\Adapters\SMS\Vonage; use Utopia\Messaging\Adapters\SMS\Vonage;
use Utopia\Queue\Server;
use Utopia\Registry\Registry; use Utopia\Registry\Registry;
use Utopia\Storage\Device; use Utopia\Storage\Device;
use Utopia\Storage\Device\Backblaze; use Utopia\Storage\Device\Backblaze;
@ -896,7 +898,9 @@ App::setResource('queueForUsage', function (Connection $queue) {
App::setResource('queueForCertificates', function (Connection $queue) { App::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue); return new Certificate($queue);
}, ['queue']); }, ['queue']);
App::setResource('queueForMigrations', function (Connection $queue) {
return new Migration($queue);
}, ['queue']);
App::setResource('clients', function ($request, $console, $project) { App::setResource('clients', function ($request, $console, $project) {
$console->setAttribute('platforms', [ // Always allow current host $console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'), '$collection' => ID::custom('platforms'),

View file

@ -10,6 +10,7 @@ use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete; use Appwrite\Event\Delete;
use Appwrite\Event\Func; use Appwrite\Event\Func;
use Appwrite\Event\Mail; use Appwrite\Event\Mail;
use Appwrite\Event\Migration;
use Appwrite\Event\Phone; use Appwrite\Event\Phone;
use Appwrite\Event\Usage; use Appwrite\Event\Usage;
use Appwrite\Platform\Appwrite; use Appwrite\Platform\Appwrite;
@ -76,7 +77,7 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register,
Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases): Database {
if ($project->isEmpty() || $project->getId() === 'console') { if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole; return $dbForConsole;
} }
@ -154,6 +155,9 @@ Server::setResource('queueForCertificates', function (Connection $queue) {
Server::setResource('queueForUsage', function (Connection $queue) { Server::setResource('queueForUsage', function (Connection $queue) {
return new Usage($queue); return new Usage($queue);
}, ['queue']); }, ['queue']);
Server::setResource('queueForMigrations', function (Connection $queue) {
return new Migration($queue);
}, ['queue']);
Server::setResource('logger', function (Registry $register) { Server::setResource('logger', function (Registry $register) {
return $register->get('logger'); return $register->get('logger');
}, ['register']); }, ['register']);
@ -169,7 +173,7 @@ Server::setResource('log', fn() => new Log());
* @param string $projectId of the project * @param string $projectId of the project
* @return Device * @return Device
*/ */
Server::setResource('deviceFunctions', function () { Server::setResource('getFunctionsDevice', function () {
return function (string $projectId) { return function (string $projectId) {
return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId); return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId);
}; };
@ -180,7 +184,7 @@ Server::setResource('deviceFunctions', function () {
* @param string $projectId of the project * @param string $projectId of the project
* @return Device * @return Device
*/ */
Server::setResource('deviceFiles', function () { Server::setResource('getFilesDevice', function () {
return function (string $projectId) { return function (string $projectId) {
return getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId); return getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId);
}; };
@ -191,7 +195,7 @@ Server::setResource('deviceFiles', function () {
* @param string $projectId of the project * @param string $projectId of the project
* @return Device * @return Device
*/ */
Server::setResource('deviceBuilds', function () { Server::setResource('getBuildsDevice', function () {
return function (string $projectId) { return function (string $projectId) {
return getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId); return getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId);
}; };
@ -202,7 +206,7 @@ Server::setResource('deviceBuilds', function () {
* @param string $projectId of the project * @param string $projectId of the project
* @return Device * @return Device
*/ */
Server::setResource('deviceCache', function () { Server::setResource('getCacheDevice', function () {
return function (string $projectId) { return function (string $projectId) {
return getDevice(APP_STORAGE_CACHE . '/app-' . $projectId); return getDevice(APP_STORAGE_CACHE . '/app-' . $projectId);
}; };
@ -286,4 +290,4 @@ try {
}); });
} }
$worker->start(); $worker->start();

View file

@ -1,10 +1,3 @@
#!/bin/sh #!/bin/sh
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] php /usr/src/code/app/worker.php migrations $@
then
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
else
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
fi
INTERVAL=0.1 QUEUE='v1-migrations' APP_INCLUDE='/usr/src/code/app/workers/migrations.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php

View file

@ -6,15 +6,21 @@ use DateTime;
use Resque; use Resque;
use ResqueScheduler; use ResqueScheduler;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Migration extends Event class Migration extends Event
{ {
protected string $type = ''; protected string $type = '';
protected ?Document $migration = null; protected ?Document $migration = null;
public function __construct() public function __construct(protected Connection $connection)
{ {
parent::__construct(Event::MIGRATIONS_QUEUE_NAME, Event::MIGRATIONS_CLASS_NAME); parent::__construct($connection);
$this
->setQueue(Event::MIGRATIONS_QUEUE_NAME)
->setClass(Event::MIGRATIONS_CLASS_NAME);
} }
/** /**
@ -72,7 +78,10 @@ class Migration extends Event
*/ */
public function trigger(): string|bool public function trigger(): string|bool
{ {
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
return $client->enqueue([
'project' => $this->project, 'project' => $this->project,
'user' => $this->user, 'user' => $this->user,
'migration' => $this->migration 'migration' => $this->migration
@ -89,10 +98,11 @@ class Migration extends Event
*/ */
public function schedule(DateTime|int $at): void public function schedule(DateTime|int $at): void
{ {
ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [ return;
'project' => $this->project, // ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [
'user' => $this->user, // 'project' => $this->project,
'migration' => $this->migration // 'user' => $this->user,
]); // 'migration' => $this->migration
// ]);
} }
} }

View file

@ -14,6 +14,7 @@ use Appwrite\Platform\Workers\Builds;
use Appwrite\Platform\Workers\Deletes; use Appwrite\Platform\Workers\Deletes;
use Appwrite\Platform\Workers\Usage; use Appwrite\Platform\Workers\Usage;
use Appwrite\Platform\Workers\UsageHook; use Appwrite\Platform\Workers\UsageHook;
use Appwrite\Platform\Workers\Migrations;
class Workers extends Service class Workers extends Service
{ {
@ -32,6 +33,7 @@ class Workers extends Service
->addAction(Deletes::getName(), new Deletes()) ->addAction(Deletes::getName(), new Deletes())
->addAction(UsageHook::getName(), new UsageHook()) ->addAction(UsageHook::getName(), new UsageHook())
->addAction(Usage::getName(), new Usage()) ->addAction(Usage::getName(), new Usage())
->addAction(Usage::getName(), new Migrations())
; ;
} }

View file

@ -7,6 +7,8 @@ use Throwable;
use Utopia\Audit\Audit; use Utopia\Audit\Audit;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Structure;
use Utopia\Platform\Action; use Utopia\Platform\Action;
use Utopia\Queue\Message; use Utopia\Queue\Message;
@ -31,8 +33,13 @@ class Audits extends Action
/** /**
* @throws Exception * @param Message $message
* @param Database $dbForProject
* @return void
* @throws Throwable * @throws Throwable
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Structure
*/ */
public function action(Message $message, Database $dbForProject): void public function action(Message $message, Database $dbForProject): void
{ {

View file

@ -18,12 +18,13 @@ use Utopia\Config\Config;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\DateTime; use Utopia\Database\DateTime;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\Authorization;
use Utopia\Database\Exception\Structure; use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\ID;
use Utopia\Platform\Action; use Utopia\Platform\Action;
use Utopia\Queue\Message; use Utopia\Queue\Message;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local; use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage; use Utopia\Storage\Storage;
use Utopia\VCS\Adapter\Git\GitHub; use Utopia\VCS\Adapter\Git\GitHub;
@ -54,7 +55,17 @@ class Builds extends Action
} }
/** /**
* @throws Exception|\Throwable * @param Message $message
* @param Database $dbForConsole
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Usage $queueForUsage
* @param Cache $cache
* @param callable $getProjectDB
* @param callable $deviceFunctions
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/ */
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions): void public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions): void
{ {
@ -84,11 +95,21 @@ class Builds extends Action
} }
/** /**
* @throws Authorization * @param callable $deviceFunctions
* @throws \Throwable * @param Func $queueForFunctions
* @throws Structure * @param Event $queueForEvents
* @param Usage $queueForUsage
* @param Database $dbForConsole
* @param callable $getProjectDB
* @param GitHub $github
* @param Document $project
* @param Document $function
* @param Document $deployment
* @param Document $template
* @return void
* @throws \Utopia\Database\Exception
*/ */
protected function buildDeployment(callable $deviceFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template) protected function buildDeployment(callable $deviceFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template): void
{ {
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
@ -518,6 +539,24 @@ class Builds extends Action
} }
} }
/**
* @param string $status
* @param GitHub $github
* @param string $providerCommitHash
* @param string $owner
* @param string $repositoryName
* @param Document $project
* @param Document $function
* @param string $deploymentId
* @param Database $dbForProject
* @param Database $dbForConsole
* @return void
* @throws Structure
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Conflict
* @throws Restricted
*/
protected function runGitAction(string $status, GitHub $github, string $providerCommitHash, string $owner, string $repositoryName, Document $project, Document $function, string $deploymentId, Database $dbForProject, Database $dbForConsole): void protected function runGitAction(string $status, GitHub $github, string $providerCommitHash, string $owner, string $repositoryName, Document $project, Document $function, string $deploymentId, Database $dbForProject, Database $dbForConsole): void
{ {
if ($function->getAttribute('providerSilentMode', false) === true) { if ($function->getAttribute('providerSilentMode', false) === true) {

View file

@ -16,6 +16,9 @@ use Utopia\CLI\Console;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\DateTime; use Utopia\Database\DateTime;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\ID;
use Utopia\Database\Query; use Utopia\Database\Query;
use Utopia\Domains\Domain; use Utopia\Domains\Domain;
@ -46,7 +49,14 @@ class Certificates extends Action
} }
/** /**
* @throws Exception|Throwable * @param Message $message
* @param Database $dbForConsole
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/ */
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void
{ {
@ -64,7 +74,15 @@ class Certificates extends Action
} }
/** /**
* @throws Exception|Throwable * @param Domain $domain
* @param Database $dbForConsole
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param bool $skipRenewCheck
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/ */
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, bool $skipRenewCheck = false): void private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, bool $skipRenewCheck = false): void
{ {
@ -176,9 +194,15 @@ class Certificates extends Action
* *
* @param string $domain Domain name that certificate is for * @param string $domain Domain name that certificate is for
* @param Document $certificate Certificate document that we need to save * @param Document $certificate Certificate document that we need to save
* @param bool $success
* @param Database $dbForConsole Database connection for console * @param Database $dbForConsole Database connection for console
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @return void * @return void
* @throws Exception|Throwable * @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Conflict
* @throws Structure
*/ */
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{ {

View file

@ -39,6 +39,7 @@ class Databases extends Action
* @param Message $message * @param Message $message
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @return void
* @throws Exception * @throws Exception
*/ */
public function action(Message $message, Database $dbForConsole, Database $dbForProject): void public function action(Message $message, Database $dbForConsole, Database $dbForProject): void
@ -79,6 +80,7 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Exception * @throws Exception
@ -206,6 +208,7 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Exception * @throws Exception
@ -369,12 +372,13 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Structure * @throws Structure
* @throws DatabaseException * @throws DatabaseException
*/ */
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject) private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
{ {
$projectId = $project->getId(); $projectId = $project->getId();
@ -439,12 +443,13 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Structure * @throws Structure
* @throws DatabaseException * @throws DatabaseException
*/ */
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject) private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
{ {
$projectId = $project->getId(); $projectId = $project->getId();

View file

@ -137,7 +137,7 @@ class Deletes extends Action
$this->deleteExpiredSessions($dbForConsole, $getProjectDB); $this->deleteExpiredSessions($dbForConsole, $getProjectDB);
break; break;
case DELETE_TYPE_USAGE: case DELETE_TYPE_USAGE:
$this->deleteUsageStats($getProjectDB, $hourlyUsageRetentionDatetime); $this->deleteUsageStats($dbForConsole, $getProjectDB, $hourlyUsageRetentionDatetime);
break; break;
case DELETE_TYPE_CACHE_BY_RESOURCE: case DELETE_TYPE_CACHE_BY_RESOURCE:
$this->deleteCacheByResource($project, $getProjectDB, $resource); $this->deleteCacheByResource($project, $getProjectDB, $resource);
@ -195,6 +195,7 @@ class Deletes extends Action
* @param Document $project * @param Document $project
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $resource * @param string $resource
* @return void
* @throws Authorization * @throws Authorization
*/ */
protected function deleteCacheByResource(Document $project, callable $getProjectDB, string $resource): void protected function deleteCacheByResource(Document $project, callable $getProjectDB, string $resource): void
@ -266,6 +267,7 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param Document $document * @param Document $document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteDatabase(callable $getProjectDB, Document $document, Document $project): void protected function deleteDatabase(callable $getProjectDB, Document $document, Document $project): void
@ -285,6 +287,7 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param Document $document teams document * @param Document $document teams document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteCollection(callable $getProjectDB, Document $document, Document $project): void protected function deleteCollection(callable $getProjectDB, Document $document, Document $project): void
@ -330,9 +333,10 @@ class Deletes extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $hourlyUsageRetentionDatetime * @param string $hourlyUsageRetentionDatetime
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteUsageStats(Database $dbForConsole, callable $getProjectDB, string $hourlyUsageRetentionDatetime) protected function deleteUsageStats(Database $dbForConsole, callable $getProjectDB, string $hourlyUsageRetentionDatetime): void
{ {
$this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $hourlyUsageRetentionDatetime) { $this->deleteForProjectIds($dbForConsole, function (Document $project) use ($getProjectDB, $hourlyUsageRetentionDatetime) {
$dbForProject = $getProjectDB($project); $dbForProject = $getProjectDB($project);
@ -348,6 +352,7 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param Document $document teams document * @param Document $document teams document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteMemberships(callable $getProjectDB, Document $document, Document $project): void protected function deleteMemberships(callable $getProjectDB, Document $document, Document $project): void
@ -400,8 +405,10 @@ class Deletes extends Action
* @param callable $getBuildsDevice * @param callable $getBuildsDevice
* @param callable $getCacheDevice * @param callable $getCacheDevice
* @param Document $document * @param Document $document
* @throws Authorization|\Utopia\Database\Exception * @return void
* @throws Exception * @throws Exception
* @throws Authorization
* @throws \Utopia\Database\Exception
*/ */
protected function deleteProject(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void protected function deleteProject(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void
{ {
@ -475,6 +482,8 @@ class Deletes extends Action
/** /**
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Document $document certificates document * @param Document $document certificates document
* @return void
* @throws Exception
*/ */
protected function deleteCertificates(Database $dbForConsole, Document $document): void protected function deleteCertificates(Database $dbForConsole, Document $document): void
{ {
@ -522,6 +531,7 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param Document $document user document * @param Document $document user document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteUser(callable $getProjectDB, Document $document, Document $project): void protected function deleteUser(callable $getProjectDB, Document $document, Document $project): void
@ -570,6 +580,7 @@ class Deletes extends Action
* @param database $dbForConsole * @param database $dbForConsole
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $datetime * @param string $datetime
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteExecutionLogs(database $dbForConsole, callable $getProjectDB, string $datetime): void protected function deleteExecutionLogs(database $dbForConsole, callable $getProjectDB, string $datetime): void
@ -609,6 +620,7 @@ class Deletes extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $datetime * @param string $datetime
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteRealtimeUsage(Database $dbForConsole, callable $getProjectDB, string $datetime): void protected function deleteRealtimeUsage(Database $dbForConsole, callable $getProjectDB, string $datetime): void
@ -626,6 +638,7 @@ class Deletes extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $datetime * @param string $datetime
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteAbuseLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void protected function deleteAbuseLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void
@ -650,6 +663,7 @@ class Deletes extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $datetime * @param string $datetime
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteAuditLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void protected function deleteAuditLogs(Database $dbForConsole, callable $getProjectDB, string $datetime): void
@ -673,6 +687,7 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param string $resource * @param string $resource
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteAuditLogsByResource(callable $getProjectDB, string $resource, Document $project): void protected function deleteAuditLogsByResource(callable $getProjectDB, string $resource, Document $project): void
@ -688,8 +703,9 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param callable $getFunctionsDevice * @param callable $getFunctionsDevice
* @param callable $getBuildsDevice * @param callable $getBuildsDevice
* @param Document $document * @param Document $document function document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteFunction(Database $dbForConsole, callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void protected function deleteFunction(Database $dbForConsole, callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void
@ -775,6 +791,7 @@ class Deletes extends Action
* @param callable $getBuildsDevice * @param callable $getBuildsDevice
* @param Document $document * @param Document $document
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteDeployment(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void protected function deleteDeployment(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void
@ -815,7 +832,7 @@ class Deletes extends Action
* Request executor to delete all deployment containers * Request executor to delete all deployment containers
*/ */
Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId); Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId);
$this->deleteRuntimes($document, $project); $this->deleteRuntimes($getProjectDB, $document, $project);
} }
@ -825,7 +842,6 @@ class Deletes extends Action
* @param Database $database to delete it from * @param Database $database to delete it from
* @param callable|null $callback to perform after document is deleted * @param callable|null $callback to perform after document is deleted
* @return bool * @return bool
* @throws Authorization
*/ */
protected function deleteById(Document $document, Database $database, callable $callback = null): bool protected function deleteById(Document $document, Database $database, callable $callback = null): bool
{ {
@ -854,9 +870,7 @@ class Deletes extends Action
$count = 0; $count = 0;
$chunk = 0; $chunk = 0;
$limit = 50; $limit = 50;
$projects = [];
$sum = $limit; $sum = $limit;
$executionStart = \microtime(true); $executionStart = \microtime(true);
while ($sum === $limit) { while ($sum === $limit) {
@ -883,6 +897,7 @@ class Deletes extends Action
* @param array $queries * @param array $queries
* @param Database $database * @param Database $database
* @param callable|null $callback * @param callable|null $callback
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
@ -920,6 +935,7 @@ class Deletes extends Action
* @param Query[] $queries * @param Query[] $queries
* @param Database $database * @param Database $database
* @param callable|null $callback * @param callable|null $callback
* @return void
* @throws Exception * @throws Exception
*/ */
protected function listByGroup(string $collection, array $queries, Database $database, callable $callback = null): void protected function listByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
@ -957,6 +973,7 @@ class Deletes extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Document $document rule document * @param Document $document rule document
* @param Document $project project document * @param Document $project project document
* @return void
*/ */
protected function deleteRule(Database $dbForConsole, Document $document, Document $project): void protected function deleteRule(Database $dbForConsole, Document $document, Document $project): void
{ {
@ -1005,8 +1022,9 @@ class Deletes extends Action
* @param Document $document * @param Document $document
* @param Document $project * @param Document $project
* @return void * @return void
* @throws Exception
*/ */
protected function deleteInstallation(Database $dbForConsole, callable $getProjectDB, Document $document, Document $project) protected function deleteInstallation(Database $dbForConsole, callable $getProjectDB, Document $document, Document $project): void
{ {
$dbForProject = $getProjectDB($project); $dbForProject = $getProjectDB($project);
@ -1032,9 +1050,10 @@ class Deletes extends Action
* @param callable $getProjectDB * @param callable $getProjectDB
* @param ?Document $function * @param ?Document $function
* @param Document $project * @param Document $project
* @return void
* @throws Exception * @throws Exception
*/ */
protected function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project) protected function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project): void
{ {
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));

View file

@ -49,7 +49,15 @@ class Functions extends Action
} }
/** /**
* @throws Exception|Throwable * @param Message $message
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Event $queueForEvents
* @param Usage $queueForUsage
* @param Log $log
* @return void
* @throws Exception
* @throws Throwable
*/ */
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void
{ {
@ -84,7 +92,6 @@ class Functions extends Action
$limit = 30; $limit = 30;
$sum = 30; $sum = 30;
$offset = 0; $offset = 0;
$functions = [];
/** @var Document[] $functions */ /** @var Document[] $functions */
while ($sum >= $limit) { while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [ $functions = $dbForProject->find('functions', [
@ -184,9 +191,28 @@ class Functions extends Action
} }
/** /**
* @param Log $log
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Usage $queueForUsage
* @param Event $queueForEvents
* @param Document $project
* @param Document $function
* @param string $trigger
* @param string $path
* @param string $method
* @param array $headers
* @param string|null $data
* @param Document|null $user
* @param string|null $jwt
* @param string|null $event
* @param string|null $eventData
* @param string|null $executionId
* @return void
* @throws Authorization * @throws Authorization
* @throws Throwable
* @throws Structure * @throws Structure
* @throws \Utopia\Database\Exception
* @throws \Utopia\Database\Exception\Conflict
*/ */
private function execute( private function execute(
Log $log, Log $log,
@ -209,7 +235,6 @@ class Functions extends Action
): void { ): void {
$user ??= new Document(); $user ??= new Document();
$functionId = $function->getId(); $functionId = $function->getId();
$functionInternalId = $function->getInternalId();
$deploymentId = $function->getAttribute('deployment', ''); $deploymentId = $function->getAttribute('deployment', '');
$log->addTag('functionId', $functionId); $log->addTag('functionId', $functionId);
@ -217,7 +242,6 @@ class Functions extends Action
/** Check if deployment exists */ /** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId); $deployment = $dbForProject->getDocument('deployments', $deploymentId);
$deploymentInternalId = $deployment->getInternalId();
if ($deployment->getAttribute('resourceId') !== $functionId) { if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function'); throw new Exception('Deployment not found. Create deployment before trying to execute a function');

View file

@ -7,10 +7,9 @@ use Exception;
use PHPMailer\PHPMailer\PHPMailer; use PHPMailer\PHPMailer\PHPMailer;
use Utopia\App; use Utopia\App;
use Utopia\CLI\Console; use Utopia\CLI\Console;
use Utopia\Database\Document;
use Utopia\Locale\Locale;
use Utopia\Platform\Action; use Utopia\Platform\Action;
use Utopia\Queue\Message; use Utopia\Queue\Message;
use Utopia\Registry\Registry;
class Mails extends Action class Mails extends Action
{ {
@ -32,10 +31,13 @@ class Mails extends Action
} }
/** /**
* @param Message $message
* @param Registry $register
* @throws \PHPMailer\PHPMailer\Exception * @throws \PHPMailer\PHPMailer\Exception
* @return void
* @throws Exception * @throws Exception
*/ */
public function action(Message $message, $register): void public function action(Message $message, Registry $register): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -44,29 +46,30 @@ class Mails extends Action
throw new Exception('Missing payload'); throw new Exception('Missing payload');
} }
if (empty(App::getEnv('_APP_SMTP_HOST'))) { $smtp = $payload['smtp'];
Console::info('Skipped mail processing. No SMTP server hostname has been set.');
if (empty($smtp) && empty(App::getEnv('_APP_SMTP_HOST'))) {
Console::info('Skipped mail processing. No SMTP configuration has been set.');
return; return;
} }
$recipient = $payload['recipient']; $recipient = $payload['recipient'];
$subject = $payload['subject']; $subject = $payload['subject'];
$variables = $payload['variables'];
$name = $payload['name']; $name = $payload['name'];
$body = $payload['body'];
$from = $payload['from']; $body = Template::fromFile(__DIR__ . '/../config/locale/templates/email-base.tpl');
foreach ($variables as $key => $value) {
$body->setParam('{{' . $key . '}}', $value);
}
$body = $body->render();
/** @var PHPMailer $mail */ /** @var PHPMailer $mail */
$mail = $register->get('smtp'); $mail = empty($smtp)
? $register->get('smtp')
// Set project mail : $this->getMailer($smtp);
/*$register->get('smtp')
->setFrom(
App::getEnv('_APP_SYSTEM_EMAIL_ADDRESS', APP_EMAIL_TEAM),
($project->getId() === 'console')
? \urldecode(App::getEnv('_APP_SYSTEM_EMAIL_NAME', APP_NAME.' Server'))
: \sprintf(Locale::getText('account.emails.team'), $project->getAttribute('name')
)
);*/
$mail->clearAddresses(); $mail->clearAddresses();
$mail->clearAllRecipients(); $mail->clearAllRecipients();
@ -74,8 +77,6 @@ class Mails extends Action
$mail->clearAttachments(); $mail->clearAttachments();
$mail->clearBCCs(); $mail->clearBCCs();
$mail->clearCCs(); $mail->clearCCs();
$mail->setFrom(App::getEnv('_APP_SYSTEM_EMAIL_ADDRESS', APP_EMAIL_TEAM), (empty($from) ? \urldecode(App::getEnv('_APP_SYSTEM_EMAIL_NAME', APP_NAME . ' Server')) : $from));
$mail->addAddress($recipient, $name); $mail->addAddress($recipient, $name);
$mail->Subject = $subject; $mail->Subject = $subject;
$mail->Body = $body; $mail->Body = $body;
@ -87,4 +88,39 @@ class Mails extends Action
throw new Exception('Error sending mail: ' . $error->getMessage(), 500); throw new Exception('Error sending mail: ' . $error->getMessage(), 500);
} }
} }
/**
* @param array $smtp
* @return PHPMailer
* @throws \PHPMailer\PHPMailer\Exception
*/
protected function getMailer(array $smtp): PHPMailer
{
$mail = new PHPMailer(true);
$mail->isSMTP();
$username = $smtp['username'];
$password = $smtp['password'];
$mail->XMailer = 'Appwrite Mailer';
$mail->Host = $smtp['host'];
$mail->Port = $smtp['port'];
$mail->SMTPAuth = (!empty($username) && !empty($password));
$mail->Username = $username;
$mail->Password = $password;
$mail->SMTPSecure = $smtp['secure'];
$mail->SMTPAutoTLS = false;
$mail->CharSet = 'UTF-8';
$mail->setFrom($smtp['senderEmail'], $smtp['senderName']);
if (!empty($smtp['replyTo'])) {
$mail->addReplyTo($smtp['replyTo'], $smtp['senderName']);
}
$mail->isHTML();
return $mail;
}
} }

View file

@ -47,6 +47,8 @@ class Messaging extends Action
} }
/** /**
* @param Message $message
* @return void
* @throws Exception * @throws Exception
*/ */
public function action(Message $message): void public function action(Message $message): void
@ -54,15 +56,18 @@ class Messaging extends Action
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
if (empty($payload)) { if (empty($payload)) {
throw new Exception('Missing payload'); Console::error('Payload arg not found');
return;
} }
if (empty($payload['recipient'])) { if (empty($payload['recipient'])) {
throw new Exception('Missing recipient'); Console::error('Recipient arg not found');
return;
} }
if (empty($payload['message'])) { if (empty($payload['message'])) {
throw new Exception('Missing message'); Console::error('Message arg not found');
return;
} }
$sms = match ($this->dsn->getHost()) { $sms = match ($this->dsn->getHost()) {
@ -75,15 +80,15 @@ class Messaging extends Action
default => null default => null
}; };
$from = App::getEnv('_APP_SMS_FROM');
if (empty(App::getEnv('_APP_SMS_PROVIDER'))) { if (empty(App::getEnv('_APP_SMS_PROVIDER'))) {
Console::info('Skipped sms processing. No Phone provider has been set.'); Console::error('Skipped sms processing. No Phone provider has been set.');
return; return;
} }
$from = App::getEnv('_APP_SMS_FROM');
if (empty($from)) { if (empty($from)) {
Console::info('Skipped sms processing. No phone number has been set.'); Console::error('Skipped sms processing. No phone number has been set.');
return; return;
} }

View file

@ -0,0 +1,330 @@
<?php
namespace Appwrite\Platform\Workers;
use Exception;
use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Permission;
use Appwrite\Role;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Helpers\ID;
use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite;
use Utopia\Migration\Resource;
use Utopia\Migration\Source;
use Utopia\Migration\Sources\Appwrite;
use Utopia\Migration\Sources\Firebase;
use Utopia\Migration\Sources\NHost;
use Utopia\Migration\Sources\Supabase;
use Utopia\Migration\Transfer;
class Migrations extends Action
{
private ?Database $dbForProject = null;
private ?Database $dbForConsole = null;
public static function getName(): string
{
return 'migrations';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
->desc('Migrations worker')
->inject('message')
->inject('getProjectDB')
->inject('dbForConsole')
->callback(fn(Message $message, callable $getProjectDB, Database $dbForConsole) => $this->action($message, $getProjectDB, $dbForConsole));
}
/**
* @param Message $message
* @param callable $getProjectDB
* @param Database $dbForConsole
* @return void
* @throws Exception
*/
public function action(Message $message, callable $getProjectDB, Database $dbForConsole): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$events = $payload['events'] ?? [];
$project = new Document($payload['project'] ?? []);
$migration = new Document($payload['migration'] ?? []);
if ($project->getId() === 'console') {
return;
}
$this->dbForProject = $getProjectDB($project);
$this->dbForConsole = $dbForConsole;
/**
* Handle Event execution.
*/
if (! empty($events)) {
return;
}
$this->processMigration($project, $migration);
}
/**
* @param string $source
* @param array $credentials
* @return Source
* @throws Exception
*/
protected function processSource(string $source, array $credentials): Source
{
return match ($source) {
Firebase::getName() => new Firebase(
json_decode($credentials['serviceAccount'], true),
),
Supabase::getName() => new Supabase(
$credentials['endpoint'],
$credentials['apiKey'],
$credentials['databaseHost'],
'postgres',
$credentials['username'],
$credentials['password'],
$credentials['port'],
),
NHost::getName() => new NHost(
$credentials['subdomain'],
$credentials['region'],
$credentials['adminSecret'],
$credentials['database'],
$credentials['username'],
$credentials['password'],
$credentials['port'],
),
Appwrite::getName() => new Appwrite($credentials['projectId'], str_starts_with($credentials['endpoint'], 'http://localhost/v1') ? 'http://appwrite/v1' : $credentials['endpoint'], $credentials['apiKey']),
default => throw new \Exception('Invalid source type'),
};
}
/**
* @throws Authorization
* @throws Structure
* @throws Conflict
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function updateMigrationDocument(Document $migration, Document $project): Document
{
/** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
'migrationId' => $migration->getId(),
]);
$target = Realtime::fromPayload(
event: $allEvents[0],
payload: $migration,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
Realtime::send(
projectId: $project->getId(),
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
}
/**
* @param Document $apiKey
* @return void
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Conflict
* @throws Restricted
* @throws Structure
*/
protected function removeAPIKey(Document $apiKey): void
{
$this->dbForConsole->deleteDocument('keys', $apiKey->getId());
}
/**
* @param Document $project
* @return Document
* @throws Authorization
* @throws Structure
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function generateAPIKey(Document $project): Document
{
$generatedSecret = bin2hex(\random_bytes(128));
$key = new Document([
'$id' => ID::unique(),
'$permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'projectInternalId' => $project->getInternalId(),
'projectId' => $project->getId(),
'name' => 'Transfer API Key',
'scopes' => [
'users.read',
'users.write',
'teams.read',
'teams.write',
'databases.read',
'databases.write',
'collections.read',
'collections.write',
'documents.read',
'documents.write',
'buckets.read',
'buckets.write',
'files.read',
'files.write',
'functions.read',
'functions.write',
],
'expire' => null,
'sdks' => [],
'accessedAt' => null,
'secret' => $generatedSecret,
]);
$this->dbForConsole->createDocument('keys', $key);
$this->dbForConsole->deleteCachedDocument('projects', $project->getId());
return $key;
}
/**
* @param Document $project
* @param Document $migration
* @return void
* @throws Authorization
* @throws Conflict
* @throws Restricted
* @throws Structure
* @throws \Utopia\Database\Exception
*/
protected function processMigration(Document $project, Document $migration): void
{
/**
* @var Document $migrationDocument
* @var Transfer $transfer
*/
$migrationDocument = null;
$transfer = null;
$projectDocument = $this->dbForConsole->getDocument('projects', $project->getId());
$tempAPIKey = $this->generateAPIKey($projectDocument);
try {
$migrationDocument = $this->dbForProject->getDocument('migrations', $migration->getId());
$migrationDocument->setAttribute('stage', 'processing');
$migrationDocument->setAttribute('status', 'processing');
$this->updateMigrationDocument($migrationDocument, $projectDocument);
$source = $this->processSource($migrationDocument->getAttribute('source'), $migrationDocument->getAttribute('credentials'));
$source->report();
$destination = new DestinationsAppwrite(
$projectDocument->getId(),
'http://appwrite/v1',
$tempAPIKey['secret'],
);
$transfer = new Transfer(
$source,
$destination
);
/** Start Transfer */
$migrationDocument->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migrationDocument, $projectDocument);
$transfer->run($migrationDocument->getAttribute('resources'), function () use ($migrationDocument, $transfer, $projectDocument) {
$migrationDocument->setAttribute('resourceData', json_encode($transfer->getCache()));
$migrationDocument->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migrationDocument, $projectDocument);
});
$errors = $transfer->getReport(Resource::STATUS_ERROR);
if (count($errors) > 0) {
$migrationDocument->setAttribute('status', 'failed');
$migrationDocument->setAttribute('stage', 'finished');
$errorMessages = [];
foreach ($errors as $error) {
$errorMessages[] = "Failed to transfer resource '{$error['id']}:{$error['resource']}' with message '{$error['message']}'";
}
$migrationDocument->setAttribute('errors', $errorMessages);
$this->updateMigrationDocument($migrationDocument, $projectDocument);
return;
}
$migrationDocument->setAttribute('status', 'completed');
$migrationDocument->setAttribute('stage', 'finished');
} catch (\Throwable $th) {
Console::error($th->getMessage());
if ($migrationDocument) {
Console::error($th->getMessage());
Console::error($th->getTraceAsString());
$migrationDocument->setAttribute('status', 'failed');
$migrationDocument->setAttribute('stage', 'finished');
$migrationDocument->setAttribute('errors', [$th->getMessage()]);
return;
}
if ($transfer) {
$errors = $transfer->getReport(Resource::STATUS_ERROR);
if (count($errors) > 0) {
$migrationDocument->setAttribute('status', 'failed');
$migrationDocument->setAttribute('stage', 'finished');
$migrationDocument->setAttribute('errors', $errors);
}
}
} finally {
if ($migrationDocument) {
$this->updateMigrationDocument($migrationDocument, $projectDocument);
}
if ($tempAPIKey) {
$this->removeAPIKey($tempAPIKey);
}
}
}
}

View file

@ -42,7 +42,11 @@ class Usage extends Action
} }
/** /**
* @throws Exception * @param Message $message
* @param $pools
* @param $cache
* @return void
* @throws \Utopia\Database\Exception
*/ */
public function action(Message $message, $pools, $cache): void public function action(Message $message, $pools, $cache): void
{ {
@ -81,10 +85,18 @@ class Usage extends Action
} }
/** /**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope. * When we remove a parent document we need to deduct his children aggregation from the project scope.
*/
* @param $database
* @param $projectInternalId
* @param Document $document
* @param array $metrics
* @param $pools
* @param $cache
* @return void
*/
private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache) private function reduce($database, $projectInternalId, Document $document, array &$metrics, $pools, $cache)
{ {
try { try {

View file

@ -4,7 +4,6 @@ namespace Appwrite\Platform\Workers;
use Utopia\App; use Utopia\App;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate; use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action; use Utopia\Platform\Action;
@ -32,6 +31,12 @@ class UsageHook extends Usage
; ;
} }
/**
* @param $register
* @param $cache
* @param $pools
* @return void
*/
public function action($register, $cache, $pools): void public function action($register, $cache, $pools): void
{ {
Timer::tick(30000, function () use ($register, $cache, $pools) { Timer::tick(30000, function () use ($register, $cache, $pools) {

View file

@ -10,7 +10,7 @@ use Utopia\Queue\Message;
class Webhooks extends Action class Webhooks extends Action
{ {
private $errors = []; private array $errors = [];
public static function getName(): string public static function getName(): string
{ {
@ -29,6 +29,8 @@ class Webhooks extends Action
} }
/** /**
* @param Message $message
* @return void
* @throws Exception * @throws Exception
*/ */
public function action(Message $message): void public function action(Message $message): void
@ -50,14 +52,19 @@ class Webhooks extends Action
} }
} }
if (!empty($errors)) { if (!empty($this->errors)) {
throw new Exception(\implode(" / \n\n", $errors)); throw new Exception(\implode(" / \n\n", $this->errors));
} }
$this->errors = [];
} }
/**
* @param array $events
* @param string $payload
* @param Document $webhook
* @param Document $user
* @param Document $project
* @return void
*/
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void
{ {
@ -67,7 +74,7 @@ class Webhooks extends Action
$httpUser = $webhook->getAttribute('httpUser'); $httpUser = $webhook->getAttribute('httpUser');
$httpPass = $webhook->getAttribute('httpPass'); $httpPass = $webhook->getAttribute('httpPass');
$ch = \curl_init($webhook->getAttribute('url')); $ch = \curl_init($webhook->getAttribute('url'));
var_dump($url);
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); \curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
\curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); \curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
\curl_setopt($ch, CURLOPT_HEADER, 0); \curl_setopt($ch, CURLOPT_HEADER, 0);