diff --git a/Dockerfile b/Dockerfile index 059c499bd..ee9818f39 100755 --- a/Dockerfile +++ b/Dockerfile @@ -80,6 +80,7 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/migrate && \ chmod +x /usr/local/bin/realtime && \ chmod +x /usr/local/bin/schedule && \ + chmod +x /usr/local/bin/schedule-message && \ chmod +x /usr/local/bin/sdks && \ chmod +x /usr/local/bin/specs && \ chmod +x /usr/local/bin/ssl && \ diff --git a/app/config/collections.php b/app/config/collections.php index cbaed36f7..913de3e06 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -1593,6 +1593,28 @@ $commonCollections = [ 'array' => false, 'filters' => ['datetime'], ], + [ + '$id' => ID::custom('scheduleInternalId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('scheduleId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], [ '$id' => ID::custom('deliveredAt'), 'type' => Database::VAR_DATETIME, @@ -4166,6 +4188,13 @@ $consoleCollections = array_merge([ 'lengths' => [], 'orders' => [], ], + [ + '$id' => ID::custom('_key_schedule_resourceType_active_resourceUpdatedAt'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['schedule', 'resourceType', 'active', 'resourceUpdatedAt'], + 'lengths' => [], + 'orders' => [], + ] ], ], diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 68e3261a8..1c32dad62 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1375,7 +1375,6 @@ App::post('/v1/account/sessions/phone') ->setMessage($messageDoc) ->setRecipients([$phone]) ->setProviderType(MESSAGE_TYPE_SMS) - ->setProject($project) ->trigger(); $queueForEvents->setPayload( @@ -3101,7 +3100,6 @@ App::post('/v1/account/verification/phone') ->setMessage($messageDoc) ->setRecipients([$user->getAttribute('phone')]) ->setProviderType(MESSAGE_TYPE_SMS) - ->setProject($project) ->trigger(); $queueForEvents diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index 411243d7b..88734b7c7 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -34,6 +34,7 @@ use Utopia\Validator\Boolean; use Utopia\Validator\JSON; use Utopia\Validator\Text; use MaxMind\Db\Reader; +use Utopia\Database\DateTime; use Utopia\Validator\WhiteList; use function Swoole\Coroutine\batch; @@ -1441,7 +1442,6 @@ App::patch('/v1/messaging/providers/fcm/:providerId') ->label('audits.resource', 'provider/{response.$id}') ->label('event', 'providers.[providerId].update') ->label('scope', 'providers.write') - ->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY]) ->label('sdk.namespace', 'messaging') ->label('sdk.method', 'updateFcmProvider') ->label('sdk.description', '/docs/references/messaging/update-fcm-provider.md') @@ -2228,10 +2228,11 @@ App::post('/v1/messaging/messages/email') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, string $subject, string $content, array $topics, array $users, array $targets, string $description, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, string $subject, string $content, array $topics, array $users, array $targets, string $description, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $messageId = $messageId == 'unique()' ? ID::unique() : $messageId; if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) { @@ -2245,6 +2246,7 @@ App::post('/v1/messaging/messages/email') 'users' => $users, 'targets' => $targets, 'description' => $description, + 'scheduledAt' => $scheduledAt, 'data' => [ 'subject' => $subject, 'content' => $content, @@ -2253,11 +2255,24 @@ App::post('/v1/messaging/messages/email') 'status' => $status, ])); - if ($status === 'processing') { + if ($status === 'processing' && $scheduledAt === null) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); + } else if ($scheduledAt !== null) { + $schedule = $dbForConsole->createDocument('schedules', new Document([ + 'region' => App::getEnv('_APP_REGION', 'default'), + 'resourceType' => 'message', + 'resourceId' => $message->getId(), + 'resourceInternalId' => $message->getInternalId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $message->getAttribute('scheduledAt'), + 'active' => $status === 'processing' ? true : false, + ])); + + $message->setAttribute('scheduleId', $schedule->getId()); + $dbForProject->updateDocument('messages', $message->getId(), $message); } $queueForEvents @@ -2292,10 +2307,11 @@ App::post('/v1/messaging/messages/sms') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, string $content, array $topics, array $users, array $targets, string $description, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, string $content, array $topics, array $users, array $targets, string $description, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $messageId = $messageId == 'unique()' ? ID::unique() : $messageId; if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) { @@ -2315,11 +2331,24 @@ App::post('/v1/messaging/messages/sms') 'status' => $status, ])); - if ($status === 'processing') { + if ($status === 'processing' && $scheduledAt === null) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); + } else if ($status === 'processing' && $scheduledAt !== null) { + $schedule = $dbForConsole->createDocument('schedules', new Document([ + 'region' => App::getEnv('_APP_REGION', 'default'), + 'resourceType' => 'message', + 'resourceId' => $message->getId(), + 'resourceInternalId' => $message->getInternalId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $message->getAttribute('scheduledAt'), + 'active' => $status === 'processing' ? true : false, + ])); + + $message->setAttribute('scheduleId', $schedule->getId()); + $dbForProject->updateDocument('messages', $message->getId(), $message); } $queueForEvents @@ -2362,10 +2391,11 @@ App::post('/v1/messaging/messages/push') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, string $title, string $body, array $topics, array $users, array $targets, string $description, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, string $title, string $body, array $topics, array $users, array $targets, string $description, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $messageId = $messageId == 'unique()' ? ID::unique() : $messageId; if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) { @@ -2394,11 +2424,24 @@ App::post('/v1/messaging/messages/push') 'status' => $status, ])); - if ($status === 'processing') { + if ($status === 'processing' && $scheduledAt === null) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); + } else if ($status === 'processing' && $scheduledAt !== null) { + $schedule = $dbForConsole->createDocument('schedules', new Document([ + 'region' => App::getEnv('_APP_REGION', 'default'), + 'resourceType' => 'message', + 'resourceId' => $message->getId(), + 'resourceInternalId' => $message->getInternalId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $message->getAttribute('scheduledAt'), + 'active' => $status === 'processing' ? true : false, + ])); + + $message->setAttribute('scheduleId', $schedule->getId()); + $dbForProject->updateDocument('messages', $message->getId(), $message); } $queueForEvents @@ -2586,10 +2629,11 @@ App::patch('/v1/messaging/messages/email/:messageId') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $subject, string $description, string $content, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $subject, string $description, string $content, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $message = $dbForProject->getDocument('messages', $messageId); if ($message->isEmpty()) { @@ -2642,14 +2686,25 @@ App::patch('/v1/messaging/messages/email/:messageId') if (!is_null($scheduledAt)) { $message->setAttribute('scheduledAt', $scheduledAt); + + $schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId')); + + $schedule + ->setAttribute('resourceUpdatedAt', DateTime::now()) + ->setAttribute('schedule', $message->getAttribute('schedule')); + + if ($message->getAttribute('status') === 'processing') { + $schedule->setAttribute('active', true); + } + + $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); } $message = $dbForProject->updateDocument('messages', $message->getId(), $message); - if ($status === 'processing') { + if ($status === 'processing' && \is_null($message->getAttribute('scheduledAt'))) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); } @@ -2684,10 +2739,11 @@ App::patch('/v1/messaging/messages/sms/:messageId') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $description, string $content, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $description, string $content, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $message = $dbForProject->getDocument('messages', $messageId); if ($message->isEmpty()) { @@ -2732,14 +2788,25 @@ App::patch('/v1/messaging/messages/sms/:messageId') if (!is_null($scheduledAt)) { $message->setAttribute('scheduledAt', $scheduledAt); + + $schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId')); + + $schedule + ->setAttribute('resourceUpdatedAt', DateTime::now()) + ->setAttribute('schedule', $message->getAttribute('schedule')); + + if ($message->getAttribute('status') === 'processing') { + $schedule->setAttribute('active', true); + } + + $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); } $message = $dbForProject->updateDocument('messages', $message->getId(), $message); - if ($status === 'processing') { + if ($status === 'processing' && \is_null($message->getAttribute('scheduledAt'))) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); } @@ -2781,10 +2848,11 @@ App::patch('/v1/messaging/messages/push/:messageId') ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('queueForEvents') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('project') ->inject('queueForMessaging') ->inject('response') - ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $description, string $title, string $body, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) { + ->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, string $description, string $title, string $body, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) { $message = $dbForProject->getDocument('messages', $messageId); if ($message->isEmpty()) { @@ -2861,16 +2929,27 @@ App::patch('/v1/messaging/messages/push/:messageId') if (!is_null($scheduledAt)) { $message->setAttribute('scheduledAt', $scheduledAt); + + $schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId')); + + $schedule + ->setAttribute('resourceUpdatedAt', DateTime::now()) + ->setAttribute('schedule', $message->getAttribute('schedule')); + + if ($message->getAttribute('status') === 'processing') { + $schedule->setAttribute('active', true); + } + + $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); } $message = $dbForProject->updateDocument('messages', $message->getId(), $message); - if ($status === 'processing') { + if ($status === 'processing' && \is_null($message->getAttribute('scheduledAt'))) { $queueForMessaging ->setMessageId($message->getId()) - ->setProject($project) ->trigger(); - } + } $queueForEvents ->setParam('messageId', $message->getId()); diff --git a/app/controllers/api/teams.php b/app/controllers/api/teams.php index 2ba27efcb..14ae4723b 100644 --- a/app/controllers/api/teams.php +++ b/app/controllers/api/teams.php @@ -653,7 +653,6 @@ App::post('/v1/teams/:teamId/memberships') ->setMessage($messageDoc) ->setRecipients([$phone]) ->setProviderType('SMS') - ->setProject($project) ->trigger(); } } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index b37d76a81..a988cb7e6 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -7,6 +7,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; +use Appwrite\Event\Messaging; use Appwrite\Extend\Exception; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Usage\Stats; @@ -97,6 +98,7 @@ App::init() ->inject('project') ->inject('user') ->inject('queueForEvents') + ->inject('queueForMessaging') ->inject('queueForAudits') ->inject('queueForDeletes') ->inject('queueForDatabase') @@ -104,7 +106,7 @@ App::init() ->inject('mode') ->inject('queueForMails') ->inject('usage') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Database $dbForProject, string $mode, Mail $queueForMails, Stats $usage) use ($databaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Database $dbForProject, string $mode, Mail $queueForMails, Stats $usage) use ($databaseListener) { $route = $utopia->getRoute(); @@ -178,6 +180,9 @@ App::init() ->setProject($project) ->setUser($user); + $queueForMessaging + ->setProject($project); + $queueForAudits ->setMode($mode) ->setUserAgent($request->getUserAgent('')) diff --git a/bin/schedule-message b/bin/schedule-message new file mode 100644 index 000000000..62e0fdbe6 --- /dev/null +++ b/bin/schedule-message @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/cli.php schedule-message $@ \ No newline at end of file diff --git a/composer.lock b/composer.lock index 16f44a635..88a037b23 100644 --- a/composer.lock +++ b/composer.lock @@ -402,16 +402,16 @@ }, { "name": "guzzlehttp/guzzle", - "version": "7.8.0", + "version": "7.8.1", "source": { "type": "git", "url": "https://github.com/guzzle/guzzle.git", - "reference": "1110f66a6530a40fe7aea0378fe608ee2b2248f9" + "reference": "41042bc7ab002487b876a0683fc8dce04ddce104" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/guzzle/guzzle/zipball/1110f66a6530a40fe7aea0378fe608ee2b2248f9", - "reference": "1110f66a6530a40fe7aea0378fe608ee2b2248f9", + "url": "https://api.github.com/repos/guzzle/guzzle/zipball/41042bc7ab002487b876a0683fc8dce04ddce104", + "reference": "41042bc7ab002487b876a0683fc8dce04ddce104", "shasum": "" }, "require": { @@ -426,11 +426,11 @@ "psr/http-client-implementation": "1.0" }, "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.1", + "bamarni/composer-bin-plugin": "^1.8.2", "ext-curl": "*", "php-http/client-integration-tests": "dev-master#2c025848417c1135031fdf9c728ee53d0a7ceaee as 3.0.999", "php-http/message-factory": "^1.1", - "phpunit/phpunit": "^8.5.29 || ^9.5.23", + "phpunit/phpunit": "^8.5.36 || ^9.6.15", "psr/log": "^1.1 || ^2.0 || ^3.0" }, "suggest": { @@ -508,7 +508,7 @@ ], "support": { "issues": "https://github.com/guzzle/guzzle/issues", - "source": "https://github.com/guzzle/guzzle/tree/7.8.0" + "source": "https://github.com/guzzle/guzzle/tree/7.8.1" }, "funding": [ { @@ -524,28 +524,28 @@ "type": "tidelift" } ], - "time": "2023-08-27T10:20:53+00:00" + "time": "2023-12-03T20:35:24+00:00" }, { "name": "guzzlehttp/promises", - "version": "2.0.1", + "version": "2.0.2", "source": { "type": "git", "url": "https://github.com/guzzle/promises.git", - "reference": "111166291a0f8130081195ac4556a5587d7f1b5d" + "reference": "bbff78d96034045e58e13dedd6ad91b5d1253223" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/guzzle/promises/zipball/111166291a0f8130081195ac4556a5587d7f1b5d", - "reference": "111166291a0f8130081195ac4556a5587d7f1b5d", + "url": "https://api.github.com/repos/guzzle/promises/zipball/bbff78d96034045e58e13dedd6ad91b5d1253223", + "reference": "bbff78d96034045e58e13dedd6ad91b5d1253223", "shasum": "" }, "require": { "php": "^7.2.5 || ^8.0" }, "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.1", - "phpunit/phpunit": "^8.5.29 || ^9.5.23" + "bamarni/composer-bin-plugin": "^1.8.2", + "phpunit/phpunit": "^8.5.36 || ^9.6.15" }, "type": "library", "extra": { @@ -591,7 +591,7 @@ ], "support": { "issues": "https://github.com/guzzle/promises/issues", - "source": "https://github.com/guzzle/promises/tree/2.0.1" + "source": "https://github.com/guzzle/promises/tree/2.0.2" }, "funding": [ { @@ -607,20 +607,20 @@ "type": "tidelift" } ], - "time": "2023-08-03T15:11:55+00:00" + "time": "2023-12-03T20:19:20+00:00" }, { "name": "guzzlehttp/psr7", - "version": "2.6.1", + "version": "2.6.2", "source": { "type": "git", "url": "https://github.com/guzzle/psr7.git", - "reference": "be45764272e8873c72dbe3d2edcfdfcc3bc9f727" + "reference": "45b30f99ac27b5ca93cb4831afe16285f57b8221" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/guzzle/psr7/zipball/be45764272e8873c72dbe3d2edcfdfcc3bc9f727", - "reference": "be45764272e8873c72dbe3d2edcfdfcc3bc9f727", + "url": "https://api.github.com/repos/guzzle/psr7/zipball/45b30f99ac27b5ca93cb4831afe16285f57b8221", + "reference": "45b30f99ac27b5ca93cb4831afe16285f57b8221", "shasum": "" }, "require": { @@ -634,9 +634,9 @@ "psr/http-message-implementation": "1.0" }, "require-dev": { - "bamarni/composer-bin-plugin": "^1.8.1", + "bamarni/composer-bin-plugin": "^1.8.2", "http-interop/http-factory-tests": "^0.9", - "phpunit/phpunit": "^8.5.29 || ^9.5.23" + "phpunit/phpunit": "^8.5.36 || ^9.6.15" }, "suggest": { "laminas/laminas-httphandlerrunner": "Emit PSR-7 responses" @@ -707,7 +707,7 @@ ], "support": { "issues": "https://github.com/guzzle/psr7/issues", - "source": "https://github.com/guzzle/psr7/tree/2.6.1" + "source": "https://github.com/guzzle/psr7/tree/2.6.2" }, "funding": [ { @@ -723,7 +723,7 @@ "type": "tidelift" } ], - "time": "2023-08-27T10:13:57+00:00" + "time": "2023-12-03T20:05:35+00:00" }, { "name": "influxdb/influxdb-php", @@ -5823,5 +5823,5 @@ "platform-overrides": { "php": "8.0" }, - "plugin-api-version": "2.3.0" + "plugin-api-version": "2.6.0" } diff --git a/docker-compose.yml b/docker-compose.yml index a570c5b61..d51dffe47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -717,6 +717,33 @@ services: - _APP_DB_USER - _APP_DB_PASS + appwrite-schedule-message: + entrypoint: schedule-message + <<: *x-logging + container_name: appwrite-schedule-message + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - mariadb + - redis + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + appwrite-assistant: container_name: appwrite-assistant image: appwrite/assistant:0.2.2 diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 28d7046dd..29e86b8c7 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -20,6 +20,7 @@ use Appwrite\Platform\Tasks\CalcTierStats; use Appwrite\Platform\Tasks\Upgrade; use Appwrite\Platform\Tasks\DeleteOrphanedProjects; use Appwrite\Platform\Tasks\PatchRecreateRepositoriesDocuments; +use Appwrite\Platform\Tasks\ScheduleMessage; class Tasks extends Service { @@ -36,6 +37,7 @@ class Tasks extends Service ->addAction(Install::getName(), new Install()) ->addAction(Upgrade::getName(), new Upgrade()) ->addAction(Maintenance::getName(), new Maintenance()) + ->addAction(ScheduleMessage::getName(), new ScheduleMessage()) ->addAction(Schedule::getName(), new Schedule()) ->addAction(Migrate::getName(), new Migrate()) ->addAction(SDKs::getName(), new SDKs()) diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index 82a62ffed..c9928dd16 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -85,7 +85,7 @@ class Maintenance extends Action private function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes): void { - ($queueForDeletes) + ($queueFor) ->setType(DELETE_TYPE_USAGE) ->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly)) ->trigger(); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessage.php b/src/Appwrite/Platform/Tasks/ScheduleMessage.php new file mode 100644 index 000000000..cd29a1dcf --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleMessage.php @@ -0,0 +1,164 @@ +desc('Execute functions scheduled in Appwrite') + ->inject('pools') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + } + + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. + */ + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + { + Console::title('Scheduler V1'); + Console::success(APP_NAME . ' Scheduler v1 has started'); + + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + + $limit = 10000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + try{ + + + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())), + Query::equal('resourceType', ['message']), + Query::equal('active', [true]), + ])); + } catch (\Exception $e) { + var_dump($e->getTraceAsString()); + } + + $sum = count($results); + $total = $total + $sum; + foreach($results as $schedule) { + $schedules[$schedule->getId()] = $schedule; + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $pools->reclaim(); + + Console::success("{$total} message were loaded in " . (microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + run( + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(self::MESSAGE_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())), + Query::equal('resourceType', ['message']), + Query::equal('active', [true]), + ])); + $sum = \count($results); + $total = $total + $sum; + foreach ($results as $schedule) { + $schedules[$schedule->getId()] = $schedule; + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + $pools->reclaim(); + + Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); + }); + + /** + * The timer to prepare soon-to-execute schedules. + */ + $enqueueMessages = function () use (&$schedules, $pools, $dbForConsole) { + foreach ($schedules as $scheduleId => $schedule) { + \go(function () use ($schedules, $schedule, $pools, $dbForConsole) { + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + $queueForMessaging = new Messaging($connection); + $queueForDeletes = new Delete($connection); + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + $queueForMessaging + ->setMessageId($schedule->getAttribute('resourceId')) + ->setProject($project) + ->trigger(); + $schedule->setAttribute('active', false); + $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); + + $queueForDeletes + ->setType(DELETE_TYPE_SCHEDULES) + ->setDocument($schedule); + + $queue->reclaim(); + unset($schedules[$schedule->getId()]); + }); + } + }; + + Timer::tick(self::MESSAGE_ENQUEUE_TIMER * 1000, fn () => $enqueueMessages()); + $enqueueMessages(); + } + ); + } +} diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 74365bad8..6bc5db42f 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -148,7 +148,7 @@ class Deletes extends Action $this->deleteCacheByDate($project, $getProjectDB, $datetime); break; case DELETE_TYPE_SCHEDULES: - $this->deleteSchedules($dbForConsole, $getProjectDB, $datetime); + $this->deleteSchedules($dbForConsole, $getProjectDB, $datetime, $document); break; case DELETE_TYPE_TOPIC: $this->deleteTopic($project, $getProjectDB, $document); @@ -167,13 +167,13 @@ class Deletes extends Action * @throws Authorization * @throws Throwable */ - private function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime): void + private function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime, ?Document $document = null): void { $this->listByGroup( 'schedules', [ Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), - Query::equal('resourceType', ['function']), + Query::equal('resourceType', [$document ?? $document->getAttribute('resourceType')]), Query::lessThanEqual('resourceUpdatedAt', $datetime), Query::equal('active', [false]), ], diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 36647e9b7..c5a6f9019 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -106,7 +106,6 @@ class Messaging extends Action $targets = $dbForProject->find('targets', [Query::equal('$id', $targetsId)]); $recipients = \array_merge($recipients, $targets); } - $primaryProvider = $dbForProject->findOne('providers', [ Query::equal('enabled', [true]), Query::equal('type', [$recipients[0]->getAttribute('providerType')]), @@ -155,7 +154,6 @@ class Messaging extends Action $providers[] = $provider; $identifiers = $identifiersByProviderId[$providerId]; - $adapter = match ($provider->getAttribute('type')) { MESSAGE_TYPE_SMS => $this->sms($provider), MESSAGE_TYPE_PUSH => $this->push($provider), diff --git a/tests/e2e/Services/Messaging/MessagingBase.php b/tests/e2e/Services/Messaging/MessagingBase.php index 690e503e7..1ae4e4fec 100644 --- a/tests/e2e/Services/Messaging/MessagingBase.php +++ b/tests/e2e/Services/Messaging/MessagingBase.php @@ -572,7 +572,8 @@ trait MessagingBase 'apiKey' => $apiKey, 'domain' => $domain, 'isEuRegion' => filter_var($isEuRegion, FILTER_VALIDATE_BOOLEAN), - 'from' => $from + 'from' => $from, + 'enabled' => true, ]); $this->assertEquals(201, $provider['headers']['status-code']); @@ -605,18 +606,8 @@ trait MessagingBase $this->assertEquals(201, $user['headers']['status-code']); // Create Target - $target = $this->client->call(Client::METHOD_POST, '/users/' . $user['body']['$id'] . '/targets', [ - 'content-type' => 'application/json', - 'x-appwrite-project' => $this->getProject()['$id'], - 'x-appwrite-key' => $this->getProject()['apiKey'], - ], [ - 'targetId' => ID::unique(), - 'providerType' => 'email', - 'providerId' => $provider['body']['$id'], - 'identifier' => $to, - ]); + $target = $user['body']['targets'][0]; - $this->assertEquals(201, $target['headers']['status-code']); // Create Subscriber $subscriber = $this->client->call(Client::METHOD_POST, '/messaging/topics/' . $topic['body']['$id'] . '/subscribers', \array_merge([ @@ -624,7 +615,7 @@ trait MessagingBase 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), [ 'subscriberId' => ID::unique(), - 'targetId' => $target['body']['$id'], + 'targetId' => $target['$id'], ]); $this->assertEquals(201, $subscriber['headers']['status-code']);