1
0
Fork 0
mirror of synced 2024-07-02 21:20:58 +12:00

Auto trigger messaging events

This commit is contained in:
Jake Barnby 2024-02-21 01:06:35 +13:00
parent 2f711c84ed
commit bbce53cda5
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C
8 changed files with 89 additions and 60 deletions

View file

@ -1773,10 +1773,10 @@ App::post('/v1/account/tokens/phone')
]); ]);
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_INTERNAL)
->setMessage($messageDoc) ->setMessage($messageDoc)
->setRecipients([$phone]) ->setRecipients([$phone])
->setProviderType(MESSAGE_TYPE_SMS) ->setProviderType(MESSAGE_TYPE_SMS);
->trigger();
$queueForEvents->setPayload( $queueForEvents->setPayload(
$response->output( $response->output(
@ -3314,10 +3314,10 @@ App::post('/v1/account/verification/phone')
]); ]);
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_INTERNAL)
->setMessage($messageDoc) ->setMessage($messageDoc)
->setRecipients([$user->getAttribute('phone')]) ->setRecipients([$user->getAttribute('phone')])
->setProviderType(MESSAGE_TYPE_SMS) ->setProviderType(MESSAGE_TYPE_SMS);
->trigger();
$queueForEvents $queueForEvents
->setParam('userId', $user->getId()) ->setParam('userId', $user->getId())
@ -3677,14 +3677,14 @@ App::post('/v1/account/mfa/challenge')
} }
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_INTERNAL)
->setMessage(new Document([ ->setMessage(new Document([
'$id' => $challenge->getId(), '$id' => $challenge->getId(),
'data' => [ 'data' => [
'content' => $code, 'content' => $code,
], ],
])) ]))
->setRecipients([$user->getAttribute('phone')]) ->setRecipients([$user->getAttribute('phone')]);
->trigger();
break; break;
case 'email': case 'email':
if (empty(App::getEnv('_APP_SMTP_HOST'))) { if (empty(App::getEnv('_APP_SMTP_HOST'))) {

View file

@ -2635,8 +2635,8 @@ App::post('/v1/messaging/messages/email')
switch ($status) { switch ($status) {
case MessageStatus::PROCESSING: case MessageStatus::PROCESSING:
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
break; break;
case MessageStatus::SCHEDULED: case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([ $schedule = $dbForConsole->createDocument('schedules', new Document([
@ -2744,8 +2744,8 @@ App::post('/v1/messaging/messages/sms')
switch ($status) { switch ($status) {
case MessageStatus::PROCESSING: case MessageStatus::PROCESSING:
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
break; break;
case MessageStatus::SCHEDULED: case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([ $schedule = $dbForConsole->createDocument('schedules', new Document([
@ -2870,8 +2870,8 @@ App::post('/v1/messaging/messages/push')
switch ($status) { switch ($status) {
case MessageStatus::PROCESSING: case MessageStatus::PROCESSING:
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
break; break;
case MessageStatus::SCHEDULED: case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([ $schedule = $dbForConsole->createDocument('schedules', new Document([
@ -3263,8 +3263,8 @@ App::patch('/v1/messaging/messages/email/:messageId')
if ($status === MessageStatus::PROCESSING) { if ($status === MessageStatus::PROCESSING) {
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
} }
$queueForEvents $queueForEvents
@ -3382,8 +3382,8 @@ App::patch('/v1/messaging/messages/sms/:messageId')
if ($status === MessageStatus::PROCESSING) { if ($status === MessageStatus::PROCESSING) {
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
} }
$queueForEvents $queueForEvents
@ -3541,8 +3541,8 @@ App::patch('/v1/messaging/messages/push/:messageId')
if ($status === MessageStatus::PROCESSING) { if ($status === MessageStatus::PROCESSING) {
$queueForMessaging $queueForMessaging
->setMessageId($message->getId()) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->trigger(); ->setMessageId($message->getId());
} }
$queueForEvents $queueForEvents

View file

@ -658,10 +658,10 @@ App::post('/v1/teams/:teamId/memberships')
]); ]);
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_INTERNAL)
->setMessage($messageDoc) ->setMessage($messageDoc)
->setRecipients([$phone]) ->setRecipients([$phone])
->setProviderType('SMS') ->setProviderType('SMS');
->trigger();
} }
} }

View file

@ -384,9 +384,6 @@ App::init()
->setProject($project) ->setProject($project)
->setUser($user); ->setUser($user);
$queueForMessaging
->setProject($project);
$queueForAudits $queueForAudits
->setMode($mode) ->setMode($mode)
->setUserAgent($request->getUserAgent('')) ->setUserAgent($request->getUserAgent(''))
@ -395,10 +392,10 @@ App::init()
->setProject($project) ->setProject($project)
->setUser($user); ->setUser($user);
$queueForDeletes->setProject($project); $queueForDeletes->setProject($project);
$queueForDatabase->setProject($project); $queueForDatabase->setProject($project);
$queueForBuilds->setProject($project); $queueForBuilds->setProject($project);
$queueForMessaging->setProject($project);
$dbForProject $dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject))
@ -517,11 +514,12 @@ App::shutdown()
->inject('queueForDeletes') ->inject('queueForDeletes')
->inject('queueForDatabase') ->inject('queueForDatabase')
->inject('queueForBuilds') ->inject('queueForBuilds')
->inject('queueForMessaging')
->inject('dbForProject') ->inject('dbForProject')
->inject('queueForFunctions') ->inject('queueForFunctions')
->inject('mode') ->inject('mode')
->inject('dbForConsole') ->inject('dbForConsole')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) {
$responsePayload = $response->getPayload(); $responsePayload = $response->getPayload();
@ -626,6 +624,10 @@ App::shutdown()
$queueForBuilds->trigger(); $queueForBuilds->trigger();
} }
if (!empty($queueForMessaging->getType())) {
$queueForBuilds->trigger();
}
/** /**
* Cache label * Cache label
*/ */

View file

@ -142,9 +142,11 @@ const APP_SOCIAL_DEV = 'https://dev.to/appwrite';
const APP_SOCIAL_STACKSHARE = 'https://stackshare.io/appwrite'; const APP_SOCIAL_STACKSHARE = 'https://stackshare.io/appwrite';
const APP_SOCIAL_YOUTUBE = 'https://www.youtube.com/c/appwrite?sub_confirmation=1'; const APP_SOCIAL_YOUTUBE = 'https://www.youtube.com/c/appwrite?sub_confirmation=1';
const APP_HOSTNAME_INTERNAL = 'appwrite'; const APP_HOSTNAME_INTERNAL = 'appwrite';
// Database Reconnect // Database Reconnect
const DATABASE_RECONNECT_SLEEP = 2; const DATABASE_RECONNECT_SLEEP = 2;
const DATABASE_RECONNECT_MAX_ATTEMPTS = 10; const DATABASE_RECONNECT_MAX_ATTEMPTS = 10;
// Database Worker Types // Database Worker Types
const DATABASE_TYPE_CREATE_ATTRIBUTE = 'createAttribute'; const DATABASE_TYPE_CREATE_ATTRIBUTE = 'createAttribute';
const DATABASE_TYPE_CREATE_INDEX = 'createIndex'; const DATABASE_TYPE_CREATE_INDEX = 'createIndex';
@ -152,9 +154,11 @@ const DATABASE_TYPE_DELETE_ATTRIBUTE = 'deleteAttribute';
const DATABASE_TYPE_DELETE_INDEX = 'deleteIndex'; const DATABASE_TYPE_DELETE_INDEX = 'deleteIndex';
const DATABASE_TYPE_DELETE_COLLECTION = 'deleteCollection'; const DATABASE_TYPE_DELETE_COLLECTION = 'deleteCollection';
const DATABASE_TYPE_DELETE_DATABASE = 'deleteDatabase'; const DATABASE_TYPE_DELETE_DATABASE = 'deleteDatabase';
// Build Worker Types // Build Worker Types
const BUILD_TYPE_DEPLOYMENT = 'deployment'; const BUILD_TYPE_DEPLOYMENT = 'deployment';
const BUILD_TYPE_RETRY = 'retry'; const BUILD_TYPE_RETRY = 'retry';
// Deletion Types // Deletion Types
const DELETE_TYPE_DATABASES = 'databases'; const DELETE_TYPE_DATABASES = 'databases';
const DELETE_TYPE_DOCUMENT = 'document'; const DELETE_TYPE_DOCUMENT = 'document';
@ -180,6 +184,10 @@ const DELETE_TYPE_TOPIC = 'topic';
const DELETE_TYPE_TARGET = 'target'; const DELETE_TYPE_TARGET = 'target';
const DELETE_TYPE_EXPIRED_TARGETS = 'invalid_targets'; const DELETE_TYPE_EXPIRED_TARGETS = 'invalid_targets';
const DELETE_TYPE_SESSION_TARGETS = 'session_targets'; const DELETE_TYPE_SESSION_TARGETS = 'session_targets';
// Message types
const MESSAGE_SEND_TYPE_INTERNAL = 'internal';
const MESSAGE_SEND_TYPE_EXTERNAL = 'external';
// Mail Types // Mail Types
const MAIL_TYPE_VERIFICATION = 'verification'; const MAIL_TYPE_VERIFICATION = 'verification';
const MAIL_TYPE_MAGIC_SESSION = 'magicSession'; const MAIL_TYPE_MAGIC_SESSION = 'magicSession';

View file

@ -8,13 +8,13 @@ use Utopia\Queue\Client;
class Messaging extends Event class Messaging extends Event
{ {
protected string $type = '';
protected ?string $messageId = null; protected ?string $messageId = null;
protected ?Document $message = null; protected ?Document $message = null;
protected ?array $recipients = null; protected ?array $recipients = null;
protected ?string $scheduledAt = null; protected ?string $scheduledAt = null;
protected ?string $providerType = null; protected ?string $providerType = null;
public function __construct(protected Connection $connection) public function __construct(protected Connection $connection)
{ {
parent::__construct($connection); parent::__construct($connection);
@ -24,6 +24,29 @@ class Messaging extends Event
->setClass(Event::MESSAGING_CLASS_NAME); ->setClass(Event::MESSAGING_CLASS_NAME);
} }
/**
* Sets type for the build event.
*
* @param string $type Can be `MESSAGE_TYPE_INTERNAL` or `MESSAGE_TYPE_EXTERNAL`.
* @return self
*/
public function setType(string $type): self
{
$this->type = $type;
return $this;
}
/**
* Returns set type for the function event.
*
* @return string
*/
public function getType(): string
{
return $this->type;
}
/** /**
* Sets recipient for the messaging event. * Sets recipient for the messaging event.
* *

View file

@ -50,6 +50,7 @@ class ScheduleMessages extends ScheduleBase
$queueForMessaging = new Messaging($connection); $queueForMessaging = new Messaging($connection);
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->setMessageId($schedule['resourceId']) ->setMessageId($schedule['resourceId'])
->setProject($schedule['project']) ->setProject($schedule['project'])
->trigger(); ->trigger();

View file

@ -44,7 +44,7 @@ class Messaging extends Action
} }
/** /**
* @throws Exception * @throws \Exception
*/ */
public function __construct() public function __construct()
{ {
@ -63,7 +63,7 @@ class Messaging extends Action
* @param Database $dbForProject * @param Database $dbForProject
* @param Usage $queueForUsage * @param Usage $queueForUsage
* @return void * @return void
* @throws Exception * @throws \Exception
*/ */
public function action(Message $message, Log $log, Database $dbForProject, Usage $queueForUsage): void public function action(Message $message, Log $log, Database $dbForProject, Usage $queueForUsage): void
{ {
@ -73,28 +73,27 @@ class Messaging extends Action
throw new Exception('Missing payload'); throw new Exception('Missing payload');
} }
$type = $payload['type'] ?? '';
$project = new Document($payload['project'] ?? []);
if ( switch ($type) {
!\is_null($payload['message']) case MESSAGE_SEND_TYPE_INTERNAL:
&& !\is_null($payload['recipients']) $message = new Document($payload['message'] ?? []);
&& $payload['providerType'] === MESSAGE_TYPE_SMS $recipients = $payload['recipients'] ?? [];
) {
// Message was triggered internally
$this->processInternalSMSMessage(
new Document($payload['message']),
new Document($payload['project'] ?? []),
$payload['recipients'],
$queueForUsage,
$log,
);
} else {
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->processMessage($dbForProject, $message); $this->sendInternalSMSMessage($message, $project, $recipients, $queueForUsage, $log);
break;
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->sendExternalMessage($dbForProject, $message);
break;
default:
throw new Exception('Unknown message type: ' . $type);
} }
} }
private function processMessage(Database $dbForProject, Document $message): void private function sendExternalMessage(Database $dbForProject, Document $message): void
{ {
$topicIds = $message->getAttribute('topics', []); $topicIds = $message->getAttribute('topics', []);
$targetIds = $message->getAttribute('targets', []); $targetIds = $message->getAttribute('targets', []);
@ -216,9 +215,9 @@ class Messaging extends Action
$identifiers = $identifiers[$providerId]; $identifiers = $identifiers[$providerId];
$adapter = match ($provider->getAttribute('type')) { $adapter = match ($provider->getAttribute('type')) {
MESSAGE_TYPE_SMS => $this->sms($provider), MESSAGE_TYPE_SMS => $this->getSmsAdapter($provider),
MESSAGE_TYPE_PUSH => $this->push($provider), MESSAGE_TYPE_PUSH => $this->getPushAdapter($provider),
MESSAGE_TYPE_EMAIL => $this->email($provider), MESSAGE_TYPE_EMAIL => $this->getEmailAdapter($provider),
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
}; };
@ -234,7 +233,7 @@ class Messaging extends Action
$messageData->setAttribute('to', $batch); $messageData->setAttribute('to', $batch);
$data = match ($provider->getAttribute('type')) { $data = match ($provider->getAttribute('type')) {
MESSAGE_TYPE_SMS => $this->buildSMSMessage($messageData, $provider), MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider),
MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData), MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData),
MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider), MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider),
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
@ -312,7 +311,7 @@ class Messaging extends Action
$dbForProject->updateDocument('messages', $message->getId(), $message); $dbForProject->updateDocument('messages', $message->getId(), $message);
} }
private function processInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void private function sendInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void
{ {
if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) { if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) {
throw new \Exception('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.'); throw new \Exception('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
@ -375,7 +374,7 @@ class Messaging extends Action
] ]
]); ]);
$adapter = $this->sms($provider); $adapter = $this->getSmsAdapter($provider);
$maxBatchSize = $adapter->getMaxMessagesPerRequest(); $maxBatchSize = $adapter->getMaxMessagesPerRequest();
$batches = \array_chunk($recipients, $maxBatchSize); $batches = \array_chunk($recipients, $maxBatchSize);
@ -385,7 +384,7 @@ class Messaging extends Action
return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) { return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) {
$message->setAttribute('to', $batch); $message->setAttribute('to', $batch);
$data = $this->buildSMSMessage($message, $provider); $data = $this->buildSmsMessage($message, $provider);
try { try {
$adapter->send($data); $adapter->send($data);
@ -401,11 +400,7 @@ class Messaging extends Action
}, $batches)); }, $batches));
} }
public function shutdown(): void private function getSmsAdapter(Document $provider): ?SMSAdapter
{
}
private function sms(Document $provider): ?SMSAdapter
{ {
$credentials = $provider->getAttribute('credentials'); $credentials = $provider->getAttribute('credentials');
@ -420,7 +415,7 @@ class Messaging extends Action
}; };
} }
private function push(Document $provider): ?PushAdapter private function getPushAdapter(Document $provider): ?PushAdapter
{ {
$credentials = $provider->getAttribute('credentials'); $credentials = $provider->getAttribute('credentials');
@ -437,7 +432,7 @@ class Messaging extends Action
}; };
} }
private function email(Document $provider): ?EmailAdapter private function getEmailAdapter(Document $provider): ?EmailAdapter
{ {
$credentials = $provider->getAttribute('credentials', []); $credentials = $provider->getAttribute('credentials', []);
$options = $provider->getAttribute('options', []); $options = $provider->getAttribute('options', []);
@ -503,7 +498,7 @@ class Messaging extends Action
return new Email($to, $subject, $content, $fromName, $fromEmail, $replyToName, $replyToEmail, $cc, $bcc, null, $html); return new Email($to, $subject, $content, $fromName, $fromEmail, $replyToName, $replyToEmail, $cc, $bcc, null, $html);
} }
private function buildSMSMessage(Document $message, Document $provider): SMS private function buildSmsMessage(Document $message, Document $provider): SMS
{ {
$to = $message['to']; $to = $message['to'];
$content = $message['data']['content']; $content = $message['data']['content'];