1
0
Fork 0
mirror of synced 2024-06-26 10:10:57 +12:00

Add new tags from worker, fix queue name

This commit is contained in:
Matej Bačo 2023-11-22 14:50:57 +01:00
parent 540ff6ee2e
commit 9fb6525063
10 changed files with 102 additions and 40 deletions

View file

@ -199,6 +199,12 @@ if (!empty($workerIndex)) {
$workerName .= '_' . $workerIndex;
}
if (\str_starts_with($workerName, 'databases')) {
$queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main');
} else {
$queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName));
}
try {
/**
* Any worker can be configured with the following env vars:
@ -206,12 +212,6 @@ try {
* - _APP_WORKER_PER_CORE The number of worker processes per core (ignored if _APP_WORKERS_NUM is set)
* - _APP_QUEUE_NAME The name of the queue to read for database events
*/
if ($workerName === 'databases') {
$queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main');
} else {
$queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName));
}
$platform->init(Service::TYPE_WORKER, [
'workersNum' => App::getEnv('_APP_WORKERS_NUM', 1),
'connection' => $pools->get('queue')->pop()->getResource(),
@ -237,7 +237,7 @@ $worker
->inject('error')
->inject('logger')
->inject('log')
->action(function (Throwable $error, ?Logger $logger, Log $log) {
->action(function (Throwable $error, ?Logger $logger, Log $log) use ($queueName) {
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
if ($error instanceof PDOException) {
@ -250,7 +250,7 @@ $worker
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-queue-' . App::getEnv('QUEUE'));
$log->setAction('appwrite-queue-' . $queueName);
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addExtra('file', $error->getFile());

View file

@ -24,6 +24,7 @@ use Utopia\Database\Exception\Restricted;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Storage\Device\Local;
@ -52,7 +53,8 @@ class Builds extends Action
->inject('cache')
->inject('dbForProject')
->inject('getFunctionsDevice')
->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice));
->inject('log')
->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice, $log));
}
/**
@ -64,10 +66,11 @@ class Builds extends Action
* @param Cache $cache
* @param Database $dbForProject
* @param callable $getFunctionsDevice
* @param Log $log
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice): void
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -81,12 +84,15 @@ class Builds extends Action
$deployment = new Document($payload['deployment'] ?? []);
$template = new Document($payload['template'] ?? []);
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
switch ($type) {
case BUILD_TYPE_DEPLOYMENT:
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template);
$this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
break;
default:
@ -106,20 +112,27 @@ class Builds extends Action
* @param Document $function
* @param Document $deployment
* @param Document $template
* @param Log $log
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template): void
protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
{
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$function = $dbForProject->getDocument('functions', $function->getId());
$functionId = $function->getId();
$log->addTag('functionId', $function->getId());
$function = $dbForProject->getDocument('functions', $functionId);
if ($function->isEmpty()) {
throw new Exception('Function not found', 404);
}
$deployment = $dbForProject->getDocument('deployments', $deployment->getId());
$deploymentId = $deployment->getId();
$log->addTag('deploymentId', $deploymentId);
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found', 404);
}

View file

@ -23,6 +23,7 @@ use Utopia\Database\Helpers\ID;
use Utopia\Database\Query;
use Utopia\Domains\Domain;
use Utopia\Locale\Locale;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
@ -45,7 +46,8 @@ class Certificates extends Action
->inject('queueForMails')
->inject('queueForEvents')
->inject('queueForFunctions')
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions));
->inject('log')
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log));
}
/**
@ -54,11 +56,12 @@ class Certificates extends Action
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Log $log
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -70,6 +73,8 @@ class Certificates extends Action
$domain = new Domain($document->getAttribute('domain', ''));
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
$log->addTag('domain', $domain->get());
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck);
}

View file

@ -16,6 +16,7 @@ use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Exception as DatabaseException;
use Utopia\Database\Query;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
@ -36,17 +37,19 @@ class Databases extends Action
->inject('message')
->inject('dbForConsole')
->inject('dbForProject')
->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject));
->inject('log')
->callback(fn(Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log));
}
/**
* @param Message $message
* @param Database $dbForConsole
* @param Database $dbForProject
* @param Log $log
* @return void
* @throws \Exception
*/
public function action(Message $message, Database $dbForConsole, Database $dbForProject): void
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -60,10 +63,15 @@ class Databases extends Action
$document = new Document($payload['document'] ?? []);
$database = new Document($payload['database'] ?? []);
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
if ($database->isEmpty()) {
throw new Exception('Missing database');
}
$log->addTag('databaseId', $database->getId());
match (strval($type)) {
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),

View file

@ -2,6 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Auth\Auth;
use Executor\Executor;
use Throwable;
use Utopia\Abuse\Abuse;
@ -20,6 +21,7 @@ use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Query;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Storage\Device;
@ -45,14 +47,15 @@ class Deletes extends Action
->inject('getFunctionsDevice')
->inject('getBuildsDevice')
->inject('getCacheDevice')
->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice));
->inject('log')
->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $log));
}
/**
* @throws Exception
* @throws Throwable
*/
public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice): void
public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -67,6 +70,9 @@ class Deletes extends Action
$document = new Document($payload['document'] ?? []);
$project = new Document($payload['project'] ?? []);
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
switch (strval($type)) {
case DELETE_TYPE_DOCUMENT:
switch ($document->getCollection()) {

View file

@ -90,6 +90,10 @@ class Functions extends Action
return;
}
$log->addTag('functionId', $function->getId());
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
if (!empty($events)) {
$limit = 30;
$sum = 30;
@ -236,15 +240,14 @@ class Functions extends Action
string $eventData = null,
string $executionId = null,
): void {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
$log->addTag('functionId', $functionId);
$log->addTag('projectId', $project->getId());
$log->addTag('deploymentId', $deploymentId);
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
@ -254,8 +257,12 @@ class Functions extends Action
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
$buildId = $deployment->getAttribute('buildId', '');
$log->addTag('buildId', $buildId);
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $buildId);
if ($build->isEmpty()) {
throw new Exception('Build not found');
}
@ -264,7 +271,7 @@ class Functions extends Action
throw new Exception('Build not ready');
}
/** Check if runtime is supported */
/** Check if runtime is supported */
$version = $function->getAttribute('version', 'v2');
$runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []);

View file

@ -8,6 +8,7 @@ use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Runtime;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Registry\Registry;
@ -28,17 +29,19 @@ class Mails extends Action
->desc('Mails worker')
->inject('message')
->inject('register')
->callback(fn($message, $register) => $this->action($message, $register));
->inject('log')
->callback(fn(Message $message, Registry $register, Log $log) => $this->action($message, $register, $log));
}
/**
* @param Message $message
* @param Registry $register
* @param Log $log
* @throws \PHPMailer\PHPMailer\Exception
* @return void
* @throws Exception
*/
public function action(Message $message, Registry $register): void
public function action(Message $message, Registry $register, Log $log): void
{
Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP);
$payload = $message->getPayload() ?? [];
@ -54,6 +57,8 @@ class Mails extends Action
return;
}
$log->addTag('type', empty($smtp) ? 'cloud' : 'smtp');
$recipient = $payload['recipient'];
$subject = $payload['subject'];
$variables = $payload['variables'];

View file

@ -6,6 +6,7 @@ use Exception;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Messaging\Messages\Sms;
use Utopia\Messaging\Adapters\SMS\Mock;
use Utopia\Messaging\Adapters\SMS\Msg91;
@ -43,15 +44,17 @@ class Messaging extends Action
$this
->desc('Messaging worker')
->inject('message')
->callback(fn($message) => $this->action($message));
->inject('log')
->callback(fn(Message $message, Log $log) => $this->action($message, $log));
}
/**
* @param Message $message
* @param Log $log
* @return void
* @throws Exception
*/
public function action(Message $message): void
public function action(Message $message, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -70,6 +73,8 @@ class Messaging extends Action
return;
}
$log->addTag('type', $this->dsn->getHost());
$sms = match ($this->dsn->getHost()) {
'mock' => new Mock($this->user, $this->secret), // used for tests
'twilio' => new Twilio($this->user, $this->secret),

View file

@ -17,6 +17,7 @@ use Appwrite\Role;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Helpers\ID;
use Utopia\Logger\Log;
use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite;
use Utopia\Migration\Resource;
use Utopia\Migration\Source;
@ -46,17 +47,19 @@ class Migrations extends Action
->inject('message')
->inject('dbForProject')
->inject('dbForConsole')
->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole) => $this->action($message, $dbForProject, $dbForConsole));
->inject('log')
->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log));
}
/**
* @param Message $message
* @param Database $dbForProject
* @param Database $dbForConsole
* @param Log $log
* @return void
* @throws Exception
*/
public function action(Message $message, Database $dbForProject, Database $dbForConsole): void
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -82,7 +85,9 @@ class Migrations extends Action
return;
}
$this->processMigration($project, $migration);
$log->addTag('projectId', $project->getId());
$this->processMigration($project, $migration, $log);
}
/**
@ -228,6 +233,7 @@ class Migrations extends Action
/**
* @param Document $project
* @param Document $migration
* @param Log $log
* @return void
* @throws Authorization
* @throws Conflict
@ -235,7 +241,7 @@ class Migrations extends Action
* @throws Structure
* @throws \Utopia\Database\Exception
*/
protected function processMigration(Document $project, Document $migration): void
protected function processMigration(Document $project, Document $migration, Log $log): void
{
/**
* @var Document $migrationDocument
@ -252,6 +258,8 @@ class Migrations extends Action
$migrationDocument->setAttribute('status', 'processing');
$this->updateMigrationDocument($migrationDocument, $projectDocument);
$log->addTag('type', $migrationDocument->getAttribute('source'));
$source = $this->processSource($migrationDocument->getAttribute('source'), $migrationDocument->getAttribute('credentials'));
$source->report();

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Workers;
use Exception;
use Utopia\App;
use Utopia\Database\Document;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
@ -25,15 +26,17 @@ class Webhooks extends Action
$this
->desc('Webhooks worker')
->inject('message')
->callback(fn ($message) => $this->action($message));
->inject('log')
->callback(fn (Message $message, Log $log) => $this->action($message, $log));
}
/**
* @param Message $message
* @param Log $log
* @return void
* @throws Exception
*/
public function action(Message $message): void
public function action(Message $message, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -46,6 +49,8 @@ class Webhooks extends Action
$project = new Document($payload['project']);
$user = new Document($payload['user'] ?? []);
$log->addTag('projectId', $project->getId());
foreach ($project->getAttribute('webhooks', []) as $webhook) {
if (array_intersect($webhook->getAttribute('events', []), $events)) {
$this->execute($events, $webhookPayload, $webhook, $user, $project);