1
0
Fork 0
mirror of synced 2024-07-08 16:06:02 +12:00

Merge branch 'refactor-scheduler-messaging' into refactor-schedulers-workers

This commit is contained in:
Bradley Schofield 2022-12-20 11:24:58 +00:00
commit f2e8a619da
3 changed files with 66 additions and 47 deletions

View file

@ -849,10 +849,12 @@ App::setResource('register', fn() => $register);
App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en')));
// Queues
App::setResource('messaging', fn() => new Phone());
App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
App::setResource('messaging', function (Connection $queue) {
return new Phone($queue);
}, ['queue']);
App::setResource('mails', function (Connection $queue) {
return new Mail($queue);
}, ['queue']);

View file

@ -1,7 +1,9 @@
<?php
use Appwrite\Resque\Worker;
require_once __DIR__ . '/../worker.php';
use Utopia\App;
use Utopia\Database\Validator\Authorization;
use Utopia\CLI\Console;
use Utopia\DSN\DSN;
use Utopia\Messaging\Adapter;
@ -12,67 +14,77 @@ use Utopia\Messaging\Adapters\SMS\TextMagic;
use Utopia\Messaging\Adapters\SMS\Twilio;
use Utopia\Messaging\Adapters\SMS\Vonage;
use Utopia\Messaging\Messages\SMS;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Messaging V1 Worker');
Console::success(APP_NAME . ' messaging worker v1 has started' . "\n");
$dsn = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
$user = $dsn->getUser();
$secret = $dsn->getPassword();
class MessagingV1 extends Worker
{
protected ?Adapter $sms = null;
protected ?string $from = null;
Server::setResource('sms', function () use ($dsn, $user, $secret) {
return match ($dsn->getHost()) {
'mock' => new Mock($user, $secret), // used for tests
'twilio' => new Twilio($user, $secret),
'text-magic' => new TextMagic($user, $secret),
'telesign' => new Telesign($user, $secret),
'msg91' => new Msg91($user, $secret),
'vonage' => new Vonage($user, $secret),
default => null
};
});
public function getName(): string
{
return "mails";
}
Server::setResource('execute', function () {
return function (string $recipient, string $message, Adapter $sms) {
$from = App::getEnv('_APP_SMS_FROM');
public function init(): void
{
$dsn = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
$user = $dsn->getUser();
$secret = $dsn->getPassword();
$this->sms = match ($dsn->getHost()) {
'mock' => new Mock($user, $secret), // used for tests
'twilio' => new Twilio($user, $secret),
'text-magic' => new TextMagic($user, $secret),
'telesign' => new Telesign($user, $secret),
'msg91' => new Msg91($user, $secret),
'vonage' => new Vonage($user, $secret),
default => null
};
$this->from = App::getEnv('_APP_SMS_FROM');
}
public function run(): void
{
if (empty(App::getEnv('_APP_SMS_PROVIDER'))) {
Console::info('Skipped sms processing. No Phone provider has been set.');
return;
}
if (empty($this->from)) {
if (empty($from)) {
Console::info('Skipped sms processing. No phone number has been set.');
return;
}
$message = new SMS(
to: [$this->args['recipient']],
content: $this->args['message'],
from: $this->from,
to: [$recipient],
content: $message,
from: $from,
);
try {
$this->sms->send($message);
$sms->send($message);
} catch (\Exception $error) {
throw new Exception('Error sending message: ' . $error->getMessage(), 500);
}
}
};
});
public function shutdown(): void
{
}
}
$server->job()
->inject('message')
->inject('execute')
->inject('sms')
->action(function (Message $message, callable $execute, Adapter $sms) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
if (empty($payload['recipient'])) {
throw new Exception('Missing recipient');
}
if (empty($payload['message'])) {
throw new Exception('Missing message');
}
$execute($payload['recipient'], $payload['message'], $sms);
});
$server->workerStart();
$server->start();

View file

@ -2,14 +2,15 @@
namespace Appwrite\Event;
use Resque;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Phone extends Event
{
protected string $recipient = '';
protected string $message = '';
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::MESSAGING_QUEUE_NAME, Event::MESSAGING_CLASS_NAME);
}
@ -68,7 +69,11 @@ class Phone extends Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
$events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null;
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'payload' => $this->payload,