From 9fb65250632897746b60aaaa331390c5402fc396 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Wed, 22 Nov 2023 14:50:57 +0100 Subject: [PATCH] Add new tags from worker, fix queue name --- app/worker.php | 16 +++++------ src/Appwrite/Platform/Workers/Builds.php | 25 ++++++++++++----- .../Platform/Workers/Certificates.php | 9 +++++-- src/Appwrite/Platform/Workers/Databases.php | 12 +++++++-- src/Appwrite/Platform/Workers/Deletes.php | 10 +++++-- src/Appwrite/Platform/Workers/Functions.php | 27 ++++++++++++------- src/Appwrite/Platform/Workers/Mails.php | 9 +++++-- src/Appwrite/Platform/Workers/Messaging.php | 9 +++++-- src/Appwrite/Platform/Workers/Migrations.php | 16 ++++++++--- src/Appwrite/Platform/Workers/Webhooks.php | 9 +++++-- 10 files changed, 102 insertions(+), 40 deletions(-) diff --git a/app/worker.php b/app/worker.php index 32a8b9804..aa51bc9ab 100644 --- a/app/worker.php +++ b/app/worker.php @@ -199,6 +199,12 @@ if (!empty($workerIndex)) { $workerName .= '_' . $workerIndex; } +if (\str_starts_with($workerName, 'databases')) { + $queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main'); +} else { + $queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName)); +} + try { /** * Any worker can be configured with the following env vars: @@ -206,12 +212,6 @@ try { * - _APP_WORKER_PER_CORE The number of worker processes per core (ignored if _APP_WORKERS_NUM is set) * - _APP_QUEUE_NAME The name of the queue to read for database events */ - if ($workerName === 'databases') { - $queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main'); - } else { - $queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName)); - } - $platform->init(Service::TYPE_WORKER, [ 'workersNum' => App::getEnv('_APP_WORKERS_NUM', 1), 'connection' => $pools->get('queue')->pop()->getResource(), @@ -237,7 +237,7 @@ $worker ->inject('error') ->inject('logger') ->inject('log') - ->action(function (Throwable $error, ?Logger $logger, Log $log) { + ->action(function (Throwable $error, ?Logger $logger, Log $log) use ($queueName) { $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); if ($error instanceof PDOException) { @@ -250,7 +250,7 @@ $worker $log->setVersion($version); $log->setType(Log::TYPE_ERROR); $log->setMessage($error->getMessage()); - $log->setAction('appwrite-queue-' . App::getEnv('QUEUE')); + $log->setAction('appwrite-queue-' . $queueName); $log->addTag('verboseType', get_class($error)); $log->addTag('code', $error->getCode()); $log->addExtra('file', $error->getFile()); diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index f1ef0df60..d2b5b9f48 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -24,6 +24,7 @@ use Utopia\Database\Exception\Restricted; use Utopia\Database\Validator\Authorization; use Utopia\Database\Exception\Structure; use Utopia\Database\Helpers\ID; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; use Utopia\Storage\Device\Local; @@ -52,7 +53,8 @@ class Builds extends Action ->inject('cache') ->inject('dbForProject') ->inject('getFunctionsDevice') - ->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice)); + ->inject('log') + ->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice, $log)); } /** @@ -64,10 +66,11 @@ class Builds extends Action * @param Cache $cache * @param Database $dbForProject * @param callable $getFunctionsDevice + * @param Log $log * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice): void + public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log): void { $payload = $message->getPayload() ?? []; @@ -81,12 +84,15 @@ class Builds extends Action $deployment = new Document($payload['deployment'] ?? []); $template = new Document($payload['template'] ?? []); + $log->addTag('projectId', $project->getId()); + $log->addTag('type', $type); + switch ($type) { case BUILD_TYPE_DEPLOYMENT: case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template); + $this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -106,20 +112,27 @@ class Builds extends Action * @param Document $function * @param Document $deployment * @param Document $template + * @param Log $log * @return void * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template): void + protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - $function = $dbForProject->getDocument('functions', $function->getId()); + $functionId = $function->getId(); + $log->addTag('functionId', $function->getId()); + + $function = $dbForProject->getDocument('functions', $functionId); if ($function->isEmpty()) { throw new Exception('Function not found', 404); } - $deployment = $dbForProject->getDocument('deployments', $deployment->getId()); + $deploymentId = $deployment->getId(); + $log->addTag('deploymentId', $deploymentId); + + $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->isEmpty()) { throw new Exception('Deployment not found', 404); } diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index ade2125e4..072165499 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -23,6 +23,7 @@ use Utopia\Database\Helpers\ID; use Utopia\Database\Query; use Utopia\Domains\Domain; use Utopia\Locale\Locale; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -45,7 +46,8 @@ class Certificates extends Action ->inject('queueForMails') ->inject('queueForEvents') ->inject('queueForFunctions') - ->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions)); + ->inject('log') + ->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log)); } /** @@ -54,11 +56,12 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Log $log * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void + public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -70,6 +73,8 @@ class Certificates extends Action $domain = new Domain($document->getAttribute('domain', '')); $skipRenewCheck = $payload['skipRenewCheck'] ?? false; + $log->addTag('domain', $domain->get()); + $this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck); } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index e0ec75e1d..c397d744d 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -16,6 +16,7 @@ use Utopia\Database\Exception\Restricted; use Utopia\Database\Exception\Structure; use Utopia\Database\Exception as DatabaseException; use Utopia\Database\Query; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -36,17 +37,19 @@ class Databases extends Action ->inject('message') ->inject('dbForConsole') ->inject('dbForProject') - ->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject)); + ->inject('log') + ->callback(fn(Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log)); } /** * @param Message $message * @param Database $dbForConsole * @param Database $dbForProject + * @param Log $log * @return void * @throws \Exception */ - public function action(Message $message, Database $dbForConsole, Database $dbForProject): void + public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void { $payload = $message->getPayload() ?? []; @@ -60,10 +63,15 @@ class Databases extends Action $document = new Document($payload['document'] ?? []); $database = new Document($payload['database'] ?? []); + $log->addTag('projectId', $project->getId()); + $log->addTag('type', $type); + if ($database->isEmpty()) { throw new Exception('Missing database'); } + $log->addTag('databaseId', $database->getId()); + match (strval($type)) { DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index b95a13a12..c4793c249 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -2,6 +2,7 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Auth\Auth; use Executor\Executor; use Throwable; use Utopia\Abuse\Abuse; @@ -20,6 +21,7 @@ use Utopia\Database\Exception\Conflict; use Utopia\Database\Exception\Restricted; use Utopia\Database\Exception\Structure; use Utopia\Database\Query; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; use Utopia\Storage\Device; @@ -45,14 +47,15 @@ class Deletes extends Action ->inject('getFunctionsDevice') ->inject('getBuildsDevice') ->inject('getCacheDevice') - ->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice)); + ->inject('log') + ->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $log)); } /** * @throws Exception * @throws Throwable */ - public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice): void + public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log): void { $payload = $message->getPayload() ?? []; @@ -67,6 +70,9 @@ class Deletes extends Action $document = new Document($payload['document'] ?? []); $project = new Document($payload['project'] ?? []); + $log->addTag('projectId', $project->getId()); + $log->addTag('type', $type); + switch (strval($type)) { case DELETE_TYPE_DOCUMENT: switch ($document->getCollection()) { diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 7a6cfe115..131834aad 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -90,6 +90,10 @@ class Functions extends Action return; } + $log->addTag('functionId', $function->getId()); + $log->addTag('projectId', $project->getId()); + $log->addTag('type', $type); + if (!empty($events)) { $limit = 30; $sum = 30; @@ -236,15 +240,14 @@ class Functions extends Action string $eventData = null, string $executionId = null, ): void { - $user ??= new Document(); - $functionId = $function->getId(); - $deploymentId = $function->getAttribute('deployment', ''); + $user ??= new Document(); + $functionId = $function->getId(); + $deploymentId = $function->getAttribute('deployment', ''); - $log->addTag('functionId', $functionId); - $log->addTag('projectId', $project->getId()); + $log->addTag('deploymentId', $deploymentId); - /** Check if deployment exists */ - $deployment = $dbForProject->getDocument('deployments', $deploymentId); + /** Check if deployment exists */ + $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->getAttribute('resourceId') !== $functionId) { throw new Exception('Deployment not found. Create deployment before trying to execute a function'); @@ -254,8 +257,12 @@ class Functions extends Action throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } - /** Check if build has exists */ - $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); + $buildId = $deployment->getAttribute('buildId', ''); + + $log->addTag('buildId', $buildId); + + /** Check if build has exists */ + $build = $dbForProject->getDocument('builds', $buildId); if ($build->isEmpty()) { throw new Exception('Build not found'); } @@ -264,7 +271,7 @@ class Functions extends Action throw new Exception('Build not ready'); } - /** Check if runtime is supported */ + /** Check if runtime is supported */ $version = $function->getAttribute('version', 'v2'); $runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []); diff --git a/src/Appwrite/Platform/Workers/Mails.php b/src/Appwrite/Platform/Workers/Mails.php index 94a2a4608..ea9b77a8c 100644 --- a/src/Appwrite/Platform/Workers/Mails.php +++ b/src/Appwrite/Platform/Workers/Mails.php @@ -8,6 +8,7 @@ use PHPMailer\PHPMailer\PHPMailer; use Swoole\Runtime; use Utopia\App; use Utopia\CLI\Console; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; use Utopia\Registry\Registry; @@ -28,17 +29,19 @@ class Mails extends Action ->desc('Mails worker') ->inject('message') ->inject('register') - ->callback(fn($message, $register) => $this->action($message, $register)); + ->inject('log') + ->callback(fn(Message $message, Registry $register, Log $log) => $this->action($message, $register, $log)); } /** * @param Message $message * @param Registry $register + * @param Log $log * @throws \PHPMailer\PHPMailer\Exception * @return void * @throws Exception */ - public function action(Message $message, Registry $register): void + public function action(Message $message, Registry $register, Log $log): void { Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP); $payload = $message->getPayload() ?? []; @@ -54,6 +57,8 @@ class Mails extends Action return; } + $log->addTag('type', empty($smtp) ? 'cloud' : 'smtp'); + $recipient = $payload['recipient']; $subject = $payload['subject']; $variables = $payload['variables']; diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 76b86e4f0..874de8413 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -6,6 +6,7 @@ use Exception; use Utopia\App; use Utopia\CLI\Console; use Utopia\DSN\DSN; +use Utopia\Logger\Log; use Utopia\Messaging\Messages\Sms; use Utopia\Messaging\Adapters\SMS\Mock; use Utopia\Messaging\Adapters\SMS\Msg91; @@ -43,15 +44,17 @@ class Messaging extends Action $this ->desc('Messaging worker') ->inject('message') - ->callback(fn($message) => $this->action($message)); + ->inject('log') + ->callback(fn(Message $message, Log $log) => $this->action($message, $log)); } /** * @param Message $message + * @param Log $log * @return void * @throws Exception */ - public function action(Message $message): void + public function action(Message $message, Log $log): void { $payload = $message->getPayload() ?? []; @@ -70,6 +73,8 @@ class Messaging extends Action return; } + $log->addTag('type', $this->dsn->getHost()); + $sms = match ($this->dsn->getHost()) { 'mock' => new Mock($this->user, $this->secret), // used for tests 'twilio' => new Twilio($this->user, $this->secret), diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 31b0df59a..95c31d07a 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -17,6 +17,7 @@ use Appwrite\Role; use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\Helpers\ID; +use Utopia\Logger\Log; use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite; use Utopia\Migration\Resource; use Utopia\Migration\Source; @@ -46,17 +47,19 @@ class Migrations extends Action ->inject('message') ->inject('dbForProject') ->inject('dbForConsole') - ->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole) => $this->action($message, $dbForProject, $dbForConsole)); + ->inject('log') + ->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log)); } /** * @param Message $message * @param Database $dbForProject * @param Database $dbForConsole + * @param Log $log * @return void * @throws Exception */ - public function action(Message $message, Database $dbForProject, Database $dbForConsole): void + public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void { $payload = $message->getPayload() ?? []; @@ -82,7 +85,9 @@ class Migrations extends Action return; } - $this->processMigration($project, $migration); + $log->addTag('projectId', $project->getId()); + + $this->processMigration($project, $migration, $log); } /** @@ -228,6 +233,7 @@ class Migrations extends Action /** * @param Document $project * @param Document $migration + * @param Log $log * @return void * @throws Authorization * @throws Conflict @@ -235,7 +241,7 @@ class Migrations extends Action * @throws Structure * @throws \Utopia\Database\Exception */ - protected function processMigration(Document $project, Document $migration): void + protected function processMigration(Document $project, Document $migration, Log $log): void { /** * @var Document $migrationDocument @@ -252,6 +258,8 @@ class Migrations extends Action $migrationDocument->setAttribute('status', 'processing'); $this->updateMigrationDocument($migrationDocument, $projectDocument); + $log->addTag('type', $migrationDocument->getAttribute('source')); + $source = $this->processSource($migrationDocument->getAttribute('source'), $migrationDocument->getAttribute('credentials')); $source->report(); diff --git a/src/Appwrite/Platform/Workers/Webhooks.php b/src/Appwrite/Platform/Workers/Webhooks.php index fa25145a1..28bc2a731 100644 --- a/src/Appwrite/Platform/Workers/Webhooks.php +++ b/src/Appwrite/Platform/Workers/Webhooks.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Workers; use Exception; use Utopia\App; use Utopia\Database\Document; +use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; @@ -25,15 +26,17 @@ class Webhooks extends Action $this ->desc('Webhooks worker') ->inject('message') - ->callback(fn ($message) => $this->action($message)); + ->inject('log') + ->callback(fn (Message $message, Log $log) => $this->action($message, $log)); } /** * @param Message $message + * @param Log $log * @return void * @throws Exception */ - public function action(Message $message): void + public function action(Message $message, Log $log): void { $payload = $message->getPayload() ?? []; @@ -46,6 +49,8 @@ class Webhooks extends Action $project = new Document($payload['project']); $user = new Document($payload['user'] ?? []); + $log->addTag('projectId', $project->getId()); + foreach ($project->getAttribute('webhooks', []) as $webhook) { if (array_intersect($webhook->getAttribute('events', []), $events)) { $this->execute($events, $webhookPayload, $webhook, $user, $project);