Merge pull request #7192 from appwrite/feat-improve-logging
Feat: Improve worker logging
This commit is contained in:
commit
d87eb14afe
10 changed files with 119 additions and 63 deletions
|
@ -202,6 +202,12 @@ if (!empty($workerIndex)) {
|
|||
$workerName .= '_' . $workerIndex;
|
||||
}
|
||||
|
||||
if (\str_starts_with($workerName, 'databases')) {
|
||||
$queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main');
|
||||
} else {
|
||||
$queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName));
|
||||
}
|
||||
|
||||
try {
|
||||
/**
|
||||
* Any worker can be configured with the following env vars:
|
||||
|
@ -209,12 +215,6 @@ try {
|
|||
* - _APP_WORKER_PER_CORE The number of worker processes per core (ignored if _APP_WORKERS_NUM is set)
|
||||
* - _APP_QUEUE_NAME The name of the queue to read for database events
|
||||
*/
|
||||
if ($workerName === 'databases') {
|
||||
$queueName = App::getEnv('_APP_QUEUE_NAME', 'database_db_main');
|
||||
} else {
|
||||
$queueName = App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName));
|
||||
}
|
||||
|
||||
$platform->init(Service::TYPE_WORKER, [
|
||||
'workersNum' => App::getEnv('_APP_WORKERS_NUM', 1),
|
||||
'connection' => $pools->get('queue')->pop()->getResource(),
|
||||
|
@ -240,20 +240,20 @@ $worker
|
|||
->inject('error')
|
||||
->inject('logger')
|
||||
->inject('log')
|
||||
->action(function (Throwable $error, ?Logger $logger, Log $log) {
|
||||
->action(function (Throwable $error, ?Logger $logger, Log $log) use ($queueName) {
|
||||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
if ($error instanceof PDOException) {
|
||||
throw $error;
|
||||
}
|
||||
|
||||
if ($logger && ($error->getCode() >= 500 || $error->getCode() === 0)) {
|
||||
if ($logger) {
|
||||
$log->setNamespace("appwrite-worker");
|
||||
$log->setServer(\gethostname());
|
||||
$log->setVersion($version);
|
||||
$log->setType(Log::TYPE_ERROR);
|
||||
$log->setMessage($error->getMessage());
|
||||
$log->setAction('appwrite-queue-' . App::getEnv('QUEUE'));
|
||||
$log->setAction('appwrite-queue-' . $queueName);
|
||||
$log->addTag('verboseType', get_class($error));
|
||||
$log->addTag('code', $error->getCode());
|
||||
$log->addExtra('file', $error->getFile());
|
||||
|
|
|
@ -4,7 +4,6 @@ namespace Appwrite\Platform\Workers;
|
|||
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Event\Func;
|
||||
use Appwrite\Event\Usage;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Usage\Stats;
|
||||
use Appwrite\Utopia\Response\Model\Deployment;
|
||||
|
@ -24,10 +23,10 @@ use Utopia\Database\Exception\Restricted;
|
|||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Storage\Device\Local;
|
||||
use Utopia\Storage\Storage;
|
||||
use Utopia\VCS\Adapter\Git\GitHub;
|
||||
|
||||
class Builds extends Action
|
||||
|
@ -52,7 +51,8 @@ class Builds extends Action
|
|||
->inject('cache')
|
||||
->inject('dbForProject')
|
||||
->inject('getFunctionsDevice')
|
||||
->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice));
|
||||
->inject('log')
|
||||
->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -64,10 +64,11 @@ class Builds extends Action
|
|||
* @param Cache $cache
|
||||
* @param Database $dbForProject
|
||||
* @param callable $getFunctionsDevice
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice): void
|
||||
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Stats $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -81,12 +82,15 @@ class Builds extends Action
|
|||
$deployment = new Document($payload['deployment'] ?? []);
|
||||
$template = new Document($payload['template'] ?? []);
|
||||
|
||||
$log->addTag('projectId', $project->getId());
|
||||
$log->addTag('type', $type);
|
||||
|
||||
switch ($type) {
|
||||
case BUILD_TYPE_DEPLOYMENT:
|
||||
case BUILD_TYPE_RETRY:
|
||||
Console::info('Creating build for deployment: ' . $deployment->getId());
|
||||
$github = new GitHub($cache);
|
||||
$this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template);
|
||||
$this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $usage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -106,20 +110,27 @@ class Builds extends Action
|
|||
* @param Document $function
|
||||
* @param Document $deployment
|
||||
* @param Document $template
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template): void
|
||||
protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Stats $usage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
|
||||
{
|
||||
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
|
||||
|
||||
$function = $dbForProject->getDocument('functions', $function->getId());
|
||||
$functionId = $function->getId();
|
||||
$log->addTag('functionId', $function->getId());
|
||||
|
||||
$function = $dbForProject->getDocument('functions', $functionId);
|
||||
if ($function->isEmpty()) {
|
||||
throw new Exception('Function not found', 404);
|
||||
}
|
||||
|
||||
$deployment = $dbForProject->getDocument('deployments', $deployment->getId());
|
||||
$deploymentId = $deployment->getId();
|
||||
$log->addTag('deploymentId', $deploymentId);
|
||||
|
||||
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
|
||||
if ($deployment->isEmpty()) {
|
||||
throw new Exception('Deployment not found', 404);
|
||||
}
|
||||
|
@ -501,10 +512,7 @@ class Builds extends Action
|
|||
$build->setAttribute('endTime', $endTime);
|
||||
$build->setAttribute('duration', \intval(\ceil($durationEnd - $durationStart)));
|
||||
$build->setAttribute('status', 'failed');
|
||||
$build->setAttribute('logs', $th->getMessage());
|
||||
Console::error($th->getMessage());
|
||||
Console::error($th->getFile() . ':' . $th->getLine());
|
||||
Console::error($th->getTraceAsString());
|
||||
$build->setAttribute('logs', $th->getMessage() . "\n" . $th->getFile() . ':' . $th->getLine() . "\n" . $th->getTraceAsString());
|
||||
|
||||
if ($isVcsEnabled) {
|
||||
$this->runGitAction('failed', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForConsole);
|
||||
|
|
|
@ -23,6 +23,7 @@ use Utopia\Database\Helpers\ID;
|
|||
use Utopia\Database\Query;
|
||||
use Utopia\Domains\Domain;
|
||||
use Utopia\Locale\Locale;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
|
@ -45,7 +46,8 @@ class Certificates extends Action
|
|||
->inject('queueForMails')
|
||||
->inject('queueForEvents')
|
||||
->inject('queueForFunctions')
|
||||
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions));
|
||||
->inject('log')
|
||||
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -54,11 +56,12 @@ class Certificates extends Action
|
|||
* @param Mail $queueForMails
|
||||
* @param Event $queueForEvents
|
||||
* @param Func $queueForFunctions
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws Throwable
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void
|
||||
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -70,6 +73,8 @@ class Certificates extends Action
|
|||
$domain = new Domain($document->getAttribute('domain', ''));
|
||||
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
|
||||
|
||||
$log->addTag('domain', $domain->get());
|
||||
|
||||
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck);
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ namespace Appwrite\Platform\Workers;
|
|||
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Utopia\Response\Model\Platform;
|
||||
use Exception;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\CLI\Console;
|
||||
|
@ -16,6 +15,7 @@ use Utopia\Database\Exception\Restricted;
|
|||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Database\Exception as DatabaseException;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
|
@ -36,17 +36,19 @@ class Databases extends Action
|
|||
->inject('message')
|
||||
->inject('dbForConsole')
|
||||
->inject('dbForProject')
|
||||
->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject));
|
||||
->inject('log')
|
||||
->callback(fn(Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, Database $dbForProject): void
|
||||
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -60,18 +62,23 @@ class Databases extends Action
|
|||
$document = new Document($payload['document'] ?? []);
|
||||
$database = new Document($payload['database'] ?? []);
|
||||
|
||||
$log->addTag('projectId', $project->getId());
|
||||
$log->addTag('type', $type);
|
||||
|
||||
if ($database->isEmpty()) {
|
||||
throw new Exception('Missing database');
|
||||
}
|
||||
|
||||
match (strval($type)) {
|
||||
$log->addTag('databaseId', $database->getId());
|
||||
|
||||
match (\strval($type)) {
|
||||
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
|
||||
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
default => Console::error('No database operation for type: ' . $type),
|
||||
default => throw new \Exception('No database operation for type: ' . \strval($type)),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -159,6 +166,7 @@ class Databases extends Action
|
|||
|
||||
$dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available'));
|
||||
} catch (\Exception $e) {
|
||||
// TODO: Send non DatabaseExceptions to Sentry
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
|
@ -168,6 +176,7 @@ class Databases extends Action
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
$dbForProject->updateDocument(
|
||||
'attributes',
|
||||
$attribute->getId(),
|
||||
|
@ -261,6 +270,7 @@ class Databases extends Action
|
|||
$dbForProject->deleteDocument('attributes', $relatedAttribute->getId());
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
// TODO: Send non DatabaseExceptions to Sentry
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
|
@ -388,6 +398,7 @@ class Databases extends Action
|
|||
}
|
||||
$dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available'));
|
||||
} catch (\Exception $e) {
|
||||
// TODO: Send non DatabaseExceptions to Sentry
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
|
@ -445,6 +456,7 @@ class Databases extends Action
|
|||
$dbForProject->deleteDocument('indexes', $index->getId());
|
||||
$index->setAttribute('status', 'deleted');
|
||||
} catch (\Exception $e) {
|
||||
// TODO: Send non DatabaseExceptions to Sentry
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
|
@ -581,7 +593,7 @@ class Databases extends Action
|
|||
$callback($document);
|
||||
}
|
||||
} else {
|
||||
Console::error('Failed to delete document: ' . $document->getId());
|
||||
Console::warning('Failed to delete document: ' . $document->getId());
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ use Utopia\Database\Exception\Conflict;
|
|||
use Utopia\Database\Exception\Restricted;
|
||||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Storage\Device;
|
||||
|
@ -46,14 +47,15 @@ class Deletes extends Action
|
|||
->inject('getFunctionsDevice')
|
||||
->inject('getBuildsDevice')
|
||||
->inject('getCacheDevice')
|
||||
->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice));
|
||||
->inject('log')
|
||||
->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
* @throws Throwable
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice): void
|
||||
public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -68,7 +70,10 @@ class Deletes extends Action
|
|||
$document = new Document($payload['document'] ?? []);
|
||||
$project = new Document($payload['project'] ?? []);
|
||||
|
||||
switch (strval($type)) {
|
||||
$log->addTag('projectId', $project->getId());
|
||||
$log->addTag('type', $type);
|
||||
|
||||
switch (\strval($type)) {
|
||||
case DELETE_TYPE_DOCUMENT:
|
||||
switch ($document->getCollection()) {
|
||||
case DELETE_TYPE_DATABASES:
|
||||
|
@ -154,7 +159,7 @@ class Deletes extends Action
|
|||
$this->deleteTopic($project, $getProjectDB, $document);
|
||||
break;
|
||||
default:
|
||||
Console::error('No delete operation for type: ' . $type);
|
||||
throw new \Exception('No delete operation for type: ' . \strval($type));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,6 +90,10 @@ class Functions extends Action
|
|||
return;
|
||||
}
|
||||
|
||||
$log->addTag('functionId', $function->getId());
|
||||
$log->addTag('projectId', $project->getId());
|
||||
$log->addTag('type', $type);
|
||||
|
||||
if (!empty($events)) {
|
||||
$limit = 30;
|
||||
$sum = 30;
|
||||
|
@ -236,15 +240,14 @@ class Functions extends Action
|
|||
string $eventData = null,
|
||||
string $executionId = null,
|
||||
): void {
|
||||
$user ??= new Document();
|
||||
$functionId = $function->getId();
|
||||
$deploymentId = $function->getAttribute('deployment', '');
|
||||
$user ??= new Document();
|
||||
$functionId = $function->getId();
|
||||
$deploymentId = $function->getAttribute('deployment', '');
|
||||
|
||||
$log->addTag('functionId', $functionId);
|
||||
$log->addTag('projectId', $project->getId());
|
||||
$log->addTag('deploymentId', $deploymentId);
|
||||
|
||||
/** Check if deployment exists */
|
||||
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
|
||||
/** Check if deployment exists */
|
||||
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
|
||||
|
||||
if ($deployment->getAttribute('resourceId') !== $functionId) {
|
||||
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||
|
@ -254,8 +257,12 @@ class Functions extends Action
|
|||
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
|
||||
}
|
||||
|
||||
/** Check if build has exists */
|
||||
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
|
||||
$buildId = $deployment->getAttribute('buildId', '');
|
||||
|
||||
$log->addTag('buildId', $buildId);
|
||||
|
||||
/** Check if build has exists */
|
||||
$build = $dbForProject->getDocument('builds', $buildId);
|
||||
if ($build->isEmpty()) {
|
||||
throw new Exception('Build not found');
|
||||
}
|
||||
|
@ -264,7 +271,7 @@ class Functions extends Action
|
|||
throw new Exception('Build not ready');
|
||||
}
|
||||
|
||||
/** Check if runtime is supported */
|
||||
/** Check if runtime is supported */
|
||||
$version = $function->getAttribute('version', 'v2');
|
||||
$runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []);
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ use Exception;
|
|||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Registry\Registry;
|
||||
|
@ -28,17 +28,19 @@ class Mails extends Action
|
|||
->desc('Mails worker')
|
||||
->inject('message')
|
||||
->inject('register')
|
||||
->callback(fn($message, $register) => $this->action($message, $register));
|
||||
->inject('log')
|
||||
->callback(fn(Message $message, Registry $register, Log $log) => $this->action($message, $register, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Registry $register
|
||||
* @param Log $log
|
||||
* @throws \PHPMailer\PHPMailer\Exception
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Registry $register): void
|
||||
public function action(Message $message, Registry $register, Log $log): void
|
||||
{
|
||||
Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP);
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
@ -50,10 +52,11 @@ class Mails extends Action
|
|||
$smtp = $payload['smtp'];
|
||||
|
||||
if (empty($smtp) && empty(App::getEnv('_APP_SMTP_HOST'))) {
|
||||
Console::info('Skipped mail processing. No SMTP configuration has been set.');
|
||||
return;
|
||||
throw new Exception('Skipped mail processing. No SMTP configuration has been set.');
|
||||
}
|
||||
|
||||
$log->addTag('type', empty($smtp) ? 'cloud' : 'smtp');
|
||||
|
||||
$recipient = $payload['recipient'];
|
||||
$subject = $payload['subject'];
|
||||
$variables = $payload['variables'];
|
||||
|
|
|
@ -7,6 +7,7 @@ use Utopia\App;
|
|||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\DSN\DSN;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Database\Database;
|
||||
|
@ -48,28 +49,29 @@ class Messaging extends Action
|
|||
$this
|
||||
->desc('Messaging worker')
|
||||
->inject('message')
|
||||
->inject('log')
|
||||
->inject('dbForProject')
|
||||
->callback(fn(Message $message, Database $dbForProject) => $this->action($message, $dbForProject));
|
||||
->callback(fn(Message $message, Log $log, Database $dbForProject) => $this->action($message, $log, $dbForProject));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Log $log
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForProject): void
|
||||
public function action(Message $message, Log $log, Database $dbForProject): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
Console::error('Payload not found.');
|
||||
return;
|
||||
throw new \Exception('Payload not found.');
|
||||
}
|
||||
|
||||
if (!\is_null($payload['message']) && !\is_null($payload['recipients'])) {
|
||||
if ($payload['providerType'] === MESSAGE_TYPE_SMS) {
|
||||
$this->processInternalSMSMessage(new Document($payload['message']), $payload['recipients']);
|
||||
$this->processInternalSMSMessage($log, new Document($payload['message']), $payload['recipients']);
|
||||
}
|
||||
} else {
|
||||
$message = $dbForProject->getDocument('messages', $payload['messageId']);
|
||||
|
@ -248,11 +250,10 @@ class Messaging extends Action
|
|||
$dbForProject->updateDocument('messages', $message->getId(), $message);
|
||||
}
|
||||
|
||||
private function processInternalSMSMessage(Document $message, array $recipients): void
|
||||
private function processInternalSMSMessage(Log $log, Document $message, array $recipients): void
|
||||
{
|
||||
if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) {
|
||||
Console::info('Skipped SMS processing. No Phone configuration has been set.');
|
||||
return;
|
||||
throw new \Exception('Skipped SMS processing. No Phone configuration has been set.');
|
||||
}
|
||||
|
||||
$smsDSN = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
|
||||
|
@ -260,6 +261,8 @@ class Messaging extends Action
|
|||
$password = $smsDSN->getPassword();
|
||||
$user = $smsDSN->getUser();
|
||||
|
||||
$log->addTag('type', $host);
|
||||
|
||||
$from = App::getEnv('_APP_SMS_FROM');
|
||||
|
||||
$provider = new Document([
|
||||
|
@ -311,7 +314,7 @@ class Messaging extends Action
|
|||
try {
|
||||
$adapter->send($data);
|
||||
} catch (\Exception $e) {
|
||||
Console::error('Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage());
|
||||
Console::error('Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage()); // TODO: Find a way to log into Sentry
|
||||
}
|
||||
};
|
||||
}, $batches));
|
||||
|
|
|
@ -17,6 +17,7 @@ use Appwrite\Role;
|
|||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite;
|
||||
use Utopia\Migration\Resource;
|
||||
use Utopia\Migration\Source;
|
||||
|
@ -46,17 +47,19 @@ class Migrations extends Action
|
|||
->inject('message')
|
||||
->inject('dbForProject')
|
||||
->inject('dbForConsole')
|
||||
->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole) => $this->action($message, $dbForProject, $dbForConsole));
|
||||
->inject('log')
|
||||
->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForProject
|
||||
* @param Database $dbForConsole
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForProject, Database $dbForConsole): void
|
||||
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -82,7 +85,9 @@ class Migrations extends Action
|
|||
return;
|
||||
}
|
||||
|
||||
$this->processMigration($project, $migration);
|
||||
$log->addTag('projectId', $project->getId());
|
||||
|
||||
$this->processMigration($project, $migration, $log);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -228,6 +233,7 @@ class Migrations extends Action
|
|||
/**
|
||||
* @param Document $project
|
||||
* @param Document $migration
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
|
@ -235,7 +241,7 @@ class Migrations extends Action
|
|||
* @throws Structure
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
protected function processMigration(Document $project, Document $migration): void
|
||||
protected function processMigration(Document $project, Document $migration, Log $log): void
|
||||
{
|
||||
/**
|
||||
* @var Document $migrationDocument
|
||||
|
@ -252,6 +258,8 @@ class Migrations extends Action
|
|||
$migrationDocument->setAttribute('status', 'processing');
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
|
||||
$log->addTag('type', $migrationDocument->getAttribute('source'));
|
||||
|
||||
$source = $this->processSource($migrationDocument->getAttribute('source'), $migrationDocument->getAttribute('credentials'));
|
||||
|
||||
$source->report();
|
||||
|
|
|
@ -5,6 +5,7 @@ namespace Appwrite\Platform\Workers;
|
|||
use Exception;
|
||||
use Utopia\App;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
|
@ -25,15 +26,17 @@ class Webhooks extends Action
|
|||
$this
|
||||
->desc('Webhooks worker')
|
||||
->inject('message')
|
||||
->callback(fn ($message) => $this->action($message));
|
||||
->inject('log')
|
||||
->callback(fn (Message $message, Log $log) => $this->action($message, $log));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Log $log
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message): void
|
||||
public function action(Message $message, Log $log): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
|
@ -46,6 +49,8 @@ class Webhooks extends Action
|
|||
$project = new Document($payload['project']);
|
||||
$user = new Document($payload['user'] ?? []);
|
||||
|
||||
$log->addTag('projectId', $project->getId());
|
||||
|
||||
foreach ($project->getAttribute('webhooks', []) as $webhook) {
|
||||
if (array_intersect($webhook->getAttribute('events', []), $events)) {
|
||||
$this->execute($events, $webhookPayload, $webhook, $user, $project);
|
||||
|
|
Loading…
Reference in a new issue