1
0
Fork 0
mirror of synced 2024-09-30 17:26:48 +13:00

refactor(messaging worker): Refactoring local device to worker only

This commit is contained in:
Binyamin Yawitz 2024-05-21 15:48:53 -04:00
parent 13b275a4ba
commit ab0d6f7c19
No known key found for this signature in database
2 changed files with 24 additions and 19 deletions

View file

@ -32,7 +32,6 @@ use Utopia\Queue\Connection;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Local;
use Utopia\System\System;
Authorization::disable();
@ -217,9 +216,6 @@ Server::setResource('deviceForCache', function (Document $project) {
return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId());
}, ['project']);
Server::setResource('deviceForLocalFiles', function (Document $project) {
return new Local(APP_STORAGE_UPLOADS . '/app-' . $project->getId());
}, ['project']);
$pools = $register->get('pools');
$platform = new Appwrite();

View file

@ -35,6 +35,7 @@ use Utopia\Messaging\Messages\SMS;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
use Utopia\System\System;
@ -42,6 +43,8 @@ use function Swoole\Coroutine\batch;
class Messaging extends Action
{
private ?Local $localDevice = null;
public static function getName(): string
{
return 'messaging';
@ -58,9 +61,8 @@ class Messaging extends Action
->inject('log')
->inject('dbForProject')
->inject('deviceForFiles')
->inject('deviceForLocalFiles')
->inject('queueForUsage')
->callback(fn (Message $message, Log $log, Database $dbForProject, Device $deviceForFiles, Device $deviceForLocalFiles, Usage $queueForUsage) => $this->action($message, $log, $dbForProject, $deviceForFiles, $deviceForLocalFiles, $queueForUsage));
->callback(fn (Message $message, Log $log, Database $dbForProject, Device $deviceForFiles, Usage $queueForUsage) => $this->action($message, $log, $dbForProject, $deviceForFiles, $queueForUsage));
}
/**
@ -68,7 +70,6 @@ class Messaging extends Action
* @param Log $log
* @param Database $dbForProject
* @param Device $deviceForFiles
* @param Device $deviceForLocalFiles
* @param Usage $queueForUsage
* @return void
* @throws \Exception
@ -78,7 +79,6 @@ class Messaging extends Action
Log $log,
Database $dbForProject,
Device $deviceForFiles,
Device $deviceForLocalFiles,
Usage $queueForUsage
): void {
Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP);
@ -101,7 +101,7 @@ class Messaging extends Action
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $deviceForLocalFiles, );
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
@ -112,7 +112,7 @@ class Messaging extends Action
Database $dbForProject,
Document $message,
Device $deviceForFiles,
Device $deviceForLocalFiles,
Document $project,
): void {
$topicIds = $message->getAttribute('topics', []);
$targetIds = $message->getAttribute('targets', []);
@ -218,8 +218,8 @@ class Messaging extends Action
/**
* @var array<array> $results
*/
$results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
$results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project) {
return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project) {
if (\array_key_exists($providerId, $providers)) {
$provider = $providers[$providerId];
} else {
@ -246,8 +246,8 @@ class Messaging extends Action
$adapter->getMaxMessagesPerRequest()
);
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $project) {
return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $project) {
$deliveredTotal = 0;
$deliveryErrors = [];
$messageData = clone $message;
@ -256,7 +256,7 @@ class Messaging extends Action
$data = match ($provider->getAttribute('type')) {
MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider),
MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData),
MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider, $deviceForFiles, $deviceForLocalFiles),
MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider, $deviceForFiles, $project),
default => throw new \Exception('Provider with the requested ID is of the incorrect type')
};
@ -354,8 +354,8 @@ class Messaging extends Action
$path = $file->getAttribute('path', '');
if ($deviceForLocalFiles->exists($path)) {
$deviceForLocalFiles->delete($path);
if ($this->getLocalDevice($project)->exists($path)) {
$this->getLocalDevice($project)->delete($path);
}
}
}
@ -517,7 +517,7 @@ class Messaging extends Action
Document $message,
Document $provider,
Device $deviceForFiles,
Device $deviceForLocalFiles,
Document $project,
): Email {
$fromName = $provider['options']['fromName'] ?? null;
$fromEmail = $provider['options']['fromEmail'] ?? null;
@ -579,7 +579,7 @@ class Messaging extends Action
}
if ($deviceForFiles->getType() !== Storage::DEVICE_LOCAL) {
$deviceForFiles->transfer($path, $path, $deviceForLocalFiles);
$deviceForFiles->transfer($path, $path, $this->getLocalDevice($project));
}
$attachment = new Attachment(
@ -651,4 +651,13 @@ class Messaging extends Action
$badge
);
}
private function getLocalDevice($project): Local
{
if($this->localDevice === null) {
$this->localDevice = new Local(APP_STORAGE_UPLOADS . '/app-' . $project->getId());
}
return $this->localDevice;
}
}