1
0
Fork 0
mirror of synced 2024-10-03 10:46:27 +13:00

Port Messages worker to Utopia queue system

This commit is contained in:
Bradley Schofield 2022-12-17 20:45:58 +00:00
parent 90e93f3255
commit a8595eaca2
3 changed files with 68 additions and 46 deletions

View file

@ -853,7 +853,10 @@ App::setResource('audits', fn() => new Audit());
App::setResource('mails', fn() => new Mail()); App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete()); App::setResource('deletes', fn() => new Delete());
App::setResource('database', fn() => new EventDatabase()); App::setResource('database', fn() => new EventDatabase());
App::setResource('messaging', fn() => new Phone()); App::setResource('messaging', function (Connection $queue) {
return new Phone($queue);
}, ['queue']);
App::setResource('queue', function (Group $pools) { App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource(); return $pools->get('queue')->pop()->getResource();
}, ['pools']); }, ['pools']);

View file

@ -1,7 +1,9 @@
<?php <?php
use Appwrite\Resque\Worker; require_once __DIR__ . '/../worker.php';
use Utopia\App; use Utopia\App;
use Utopia\Database\Validator\Authorization;
use Utopia\CLI\Console; use Utopia\CLI\Console;
use Utopia\DSN\DSN; use Utopia\DSN\DSN;
use Utopia\Messaging\Adapter; use Utopia\Messaging\Adapter;
@ -12,67 +14,78 @@ use Utopia\Messaging\Adapters\SMS\TextMagic;
use Utopia\Messaging\Adapters\SMS\Twilio; use Utopia\Messaging\Adapters\SMS\Twilio;
use Utopia\Messaging\Adapters\SMS\Vonage; use Utopia\Messaging\Adapters\SMS\Vonage;
use Utopia\Messaging\Messages\SMS; use Utopia\Messaging\Messages\SMS;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php'; Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Messaging V1 Worker'); $dsn = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
Console::success(APP_NAME . ' messaging worker v1 has started' . "\n"); $user = $dsn->getUser();
$secret = $dsn->getPassword();
class MessagingV1 extends Worker Server::setResource('sms', function () use ($dsn, $user, $secret) {
{ return match ($dsn->getHost()) {
protected ?Adapter $sms = null; 'mock' => new Mock($user, $secret), // used for tests
protected ?string $from = null; 'twilio' => new Twilio($user, $secret),
'text-magic' => new TextMagic($user, $secret),
'telesign' => new Telesign($user, $secret),
'msg91' => new Msg91($user, $secret),
'vonage' => new Vonage($user, $secret),
default => null
};
});
public function getName(): string Server::setResource('execute', function () {
{ return function (string $recipient, string $message, Adapter $sms) {
return "mails"; $from = App::getEnv('_APP_SMS_FROM');
}
public function init(): void
{
$dsn = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
$user = $dsn->getUser();
$secret = $dsn->getPassword();
$this->sms = match ($dsn->getHost()) {
'mock' => new Mock($user, $secret), // used for tests
'twilio' => new Twilio($user, $secret),
'text-magic' => new TextMagic($user, $secret),
'telesign' => new Telesign($user, $secret),
'msg91' => new Msg91($user, $secret),
'vonage' => new Vonage($user, $secret),
default => null
};
$this->from = App::getEnv('_APP_SMS_FROM');
}
public function run(): void
{
if (empty(App::getEnv('_APP_SMS_PROVIDER'))) { if (empty(App::getEnv('_APP_SMS_PROVIDER'))) {
Console::info('Skipped sms processing. No Phone provider has been set.'); Console::info('Skipped sms processing. No Phone provider has been set.');
return; return;
} }
if (empty($this->from)) { if (empty($from)) {
Console::info('Skipped sms processing. No phone number has been set.'); Console::info('Skipped sms processing. No phone number has been set.');
return; return;
} }
$message = new SMS( $message = new SMS(
to: [$this->args['recipient']], to: [$recipient],
content: $this->args['message'], content: $message,
from: $this->from, from: $from,
); );
try { try {
$this->sms->send($message); $sms->send($message);
Console::info('Successfully sent sms message to ' . $recipient);
} catch (\Exception $error) { } catch (\Exception $error) {
throw new Exception('Error sending message: ' . $error->getMessage(), 500); throw new Exception('Error sending message: ' . $error->getMessage(), 500);
} }
} };
});
public function shutdown(): void $server->job()
{ ->inject('message')
} ->inject('execute')
} ->inject('sms')
->action(function (Message $message, callable $execute, Adapter $sms) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
if (empty($payload['recipient'])) {
throw new Exception('Missing recipient');
}
if (empty($payload['message'])) {
throw new Exception('Missing message');
}
$execute($payload['recipient'], $payload['message'], $sms);
});
$server->workerStart();
$server->start();

View file

@ -3,13 +3,15 @@
namespace Appwrite\Event; namespace Appwrite\Event;
use Resque; use Resque;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Phone extends Event class Phone extends Event
{ {
protected string $recipient = ''; protected string $recipient = '';
protected string $message = ''; protected string $message = '';
public function __construct() public function __construct(protected Connection $connection)
{ {
parent::__construct(Event::MESSAGING_QUEUE_NAME, Event::MESSAGING_CLASS_NAME); parent::__construct(Event::MESSAGING_QUEUE_NAME, Event::MESSAGING_CLASS_NAME);
} }
@ -68,7 +70,11 @@ class Phone extends Event
*/ */
public function trigger(): string|bool public function trigger(): string|bool
{ {
return Resque::enqueue($this->queue, $this->class, [ $client = new Client($this->queue, $this->connection);
$events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null;
return $client->enqueue([
'project' => $this->project, 'project' => $this->project,
'user' => $this->user, 'user' => $this->user,
'payload' => $this->payload, 'payload' => $this->payload,