1
0
Fork 0
mirror of synced 2024-06-01 18:39:57 +12:00

Merge branch '1.5.x' into fix-limit-failed-webhook-attempts

This commit is contained in:
Khushboo Verma 2024-01-19 17:12:14 +05:30
commit a134ca7efd
31 changed files with 1233 additions and 546 deletions

View file

@ -76,41 +76,42 @@ RUN chmod +x /usr/local/bin/dev-generate-translations
# Executables
RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/maintenance && \
chmod +x /usr/local/bin/usage && \
chmod +x /usr/local/bin/install && \
chmod +x /usr/local/bin/upgrade && \
chmod +x /usr/local/bin/maintenance && \
chmod +x /usr/local/bin/migrate && \
chmod +x /usr/local/bin/realtime && \
chmod +x /usr/local/bin/schedule && \
chmod +x /usr/local/bin/schedule-functions && \
chmod +x /usr/local/bin/schedule-messages && \
chmod +x /usr/local/bin/sdks && \
chmod +x /usr/local/bin/specs && \
chmod +x /usr/local/bin/ssl && \
chmod +x /usr/local/bin/test && \
chmod +x /usr/local/bin/upgrade && \
chmod +x /usr/local/bin/usage && \
chmod +x /usr/local/bin/vars && \
chmod +x /usr/local/bin/worker-audits && \
chmod +x /usr/local/bin/worker-builds && \
chmod +x /usr/local/bin/worker-certificates && \
chmod +x /usr/local/bin/worker-databases && \
chmod +x /usr/local/bin/worker-deletes && \
chmod +x /usr/local/bin/worker-functions && \
chmod +x /usr/local/bin/worker-builds && \
chmod +x /usr/local/bin/worker-hamster && \
chmod +x /usr/local/bin/worker-mails && \
chmod +x /usr/local/bin/worker-messaging && \
chmod +x /usr/local/bin/worker-webhooks && \
chmod +x /usr/local/bin/worker-migrations && \
chmod +x /usr/local/bin/worker-hamster
chmod +x /usr/local/bin/worker-webhooks
# Cloud Executabless
RUN chmod +x /usr/local/bin/hamster && \
chmod +x /usr/local/bin/volume-sync && \
RUN chmod +x /usr/local/bin/calc-tier-stats && \
chmod +x /usr/local/bin/calc-users-stats && \
chmod +x /usr/local/bin/clear-card-cache && \
chmod +x /usr/local/bin/delete-orphaned-projects && \
chmod +x /usr/local/bin/get-migration-stats && \
chmod +x /usr/local/bin/hamster && \
chmod +x /usr/local/bin/patch-delete-project-collections && \
chmod +x /usr/local/bin/patch-delete-schedule-updated-at-attribute && \
chmod +x /usr/local/bin/patch-recreate-repositories-documents && \
chmod +x /usr/local/bin/patch-delete-project-collections && \
chmod +x /usr/local/bin/delete-orphaned-projects && \
chmod +x /usr/local/bin/clear-card-cache && \
chmod +x /usr/local/bin/calc-users-stats && \
chmod +x /usr/local/bin/calc-tier-stats && \
chmod +x /usr/local/bin/get-migration-stats
chmod +x /usr/local/bin/volume-sync
# Letsencrypt Permissions
RUN mkdir -p /etc/letsencrypt/live/ && chmod -Rf 755 /etc/letsencrypt/live/

View file

@ -1593,6 +1593,28 @@ $commonCollections = [
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('scheduleInternalId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('scheduleId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('deliveredAt'),
'type' => Database::VAR_DATETIME,
@ -1810,6 +1832,17 @@ $commonCollections = [
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('search'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 16384,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
],
'indexes' => [
[
@ -1853,7 +1886,14 @@ $commonCollections = [
'attributes' => ['topicInternalId'],
'lengths' => [],
'orders' => [],
]
],
[
'$id' => ID::custom('_fulltext_search'),
'type' => Database::INDEX_FULLTEXT,
'attributes' => ['search'],
'lengths' => [],
'orders' => [],
],
],
],
@ -4120,6 +4160,17 @@ $consoleCollections = array_merge([
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('resourceCollection'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('resourceInternalId'),
'type' => Database::VAR_STRING,

View file

@ -4,6 +4,7 @@
* List of server wide error codes and their respective messages.
*/
use Appwrite\Enum\MessageStatus;
use Appwrite\Extend\Exception;
return [
@ -807,7 +808,7 @@ return [
],
Exception::PROVIDER_INCORRECT_TYPE => [
'name' => Exception::PROVIDER_INCORRECT_TYPE,
'description' => 'Provider with the requested ID is of incorrect type: ',
'description' => 'Provider with the requested ID is of the incorrect type.',
'code' => 400,
],
@ -858,18 +859,27 @@ return [
],
Exception::MESSAGE_TARGET_NOT_EMAIL => [
'name' => Exception::MESSAGE_TARGET_NOT_EMAIL,
'description' => 'Message with the target ID is not an email target:',
'description' => 'Message with the target ID is not an email target.',
'code' => 400,
],
Exception::MESSAGE_TARGET_NOT_SMS => [
'name' => Exception::MESSAGE_TARGET_NOT_SMS,
'description' => 'Message with the target ID is not an SMS target:',
'description' => 'Message with the target ID is not an SMS target.',
'code' => 400,
],
Exception::MESSAGE_TARGET_NOT_PUSH => [
'name' => Exception::MESSAGE_TARGET_NOT_PUSH,
'description' => 'Message with the target ID is not a push target:',
'description' => 'Message with the target ID is not a push target.',
'code' => 400,
],
Exception::MESSAGE_MISSING_SCHEDULE => [
'name' => Exception::MESSAGE_MISSING_SCHEDULE,
'description' => 'Message can not have status ' . MessageStatus::SCHEDULED . ' without a schedule.',
'code' => 400,
],
Exception::SCHEDULE_NOT_FOUND => [
'name' => Exception::SCHEDULE_NOT_FOUND,
'description' => 'Schedule with the requested ID could not be found.',
'code' => 404,
],
];

View file

@ -1509,7 +1509,6 @@ App::post('/v1/account/tokens/phone')
->setMessage($messageDoc)
->setRecipients([$phone])
->setProviderType(MESSAGE_TYPE_SMS)
->setProject($project)
->trigger();
$queueForEvents->setPayload(
@ -1719,7 +1718,7 @@ App::post('/v1/account/jwt')
App::post('/v1/account/targets/push')
->desc('Create Account\'s push target')
->groups(['api', 'account'])
->label('error', __DIR__ . '/../../views/general/error.phtml')
->label('scope', 'account')
->label('audits.event', 'target.create')
->label('audits.resource', 'target/response.$id')
->label('event', 'users.[userId].targets.[targetId].create')
@ -1729,10 +1728,9 @@ App::post('/v1/account/targets/push')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
->label('sdk.response.model', Response::MODEL_TARGET)
->label('docs', false)
->param('targetId', '', new CustomId(), 'Target ID. Choose a custom ID or generate a random ID with `ID.unique()`. Valid chars are a-z, A-Z, 0-9, period, hyphen, and underscore. Can\'t start with a special char. Max length is 36 chars.')
->param('providerId', '', new UID(), 'Provider ID. Message will be sent to this target from the specified provider ID. If no provider ID is set the first setup provider will be used.')
->param('identifier', '', new Text(Database::LENGTH_KEY), 'The target identifier (token, email, phone etc.)')
->param('providerId', '', new UID(), 'Provider ID. Message will be sent to this target from the specified provider ID. If no provider ID is set the first setup provider will be used.', true)
->inject('queueForEvents')
->inject('user')
->inject('request')
@ -1743,10 +1741,6 @@ App::post('/v1/account/targets/push')
$provider = Authorization::skip(fn () => $dbForProject->getDocument('providers', $providerId));
if ($provider->isEmpty()) {
throw new Exception(Exception::PROVIDER_NOT_FOUND);
}
if ($user->isEmpty()) {
throw new Exception(Exception::USER_NOT_FOUND);
}
@ -1769,8 +1763,8 @@ App::post('/v1/account/targets/push')
Permission::read(Role::user($user->getId())),
Permission::update(Role::user($user->getId())),
],
'providerId' => $providerId ?? null,
'providerInternalId' => $provider->getInternalId() ?? null,
'providerId' => !empty($providerId) ? $providerId : null,
'providerInternalId' => !empty($providerId) ? $provider->getInternalId() : null,
'providerType' => MESSAGE_TYPE_PUSH,
'userId' => $user->getId(),
'userInternalId' => $user->getInternalId(),
@ -3124,7 +3118,6 @@ App::post('/v1/account/verification/phone')
->setMessage($messageDoc)
->setRecipients([$user->getAttribute('phone')])
->setProviderType(MESSAGE_TYPE_SMS)
->setProject($project)
->trigger();
$queueForEvents
@ -3206,7 +3199,7 @@ App::put('/v1/account/verification/phone')
App::put('/v1/account/targets/:targetId/push')
->desc('Update Account\'s push target')
->groups(['api', 'account'])
->label('error', __DIR__ . '/../../views/general/error.phtml')
->label('scope', 'account')
->label('audits.event', 'target.update')
->label('audits.resource', 'target/response.$id')
->label('event', 'users.[userId].targets.[targetId].update')

View file

@ -228,6 +228,7 @@ App::post('/v1/functions')
fn () => $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'), // Todo replace with projects region
'resourceType' => 'function',
'resourceCollection' => 'functions',
'resourceId' => $function->getId(),
'resourceInternalId' => $function->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),

View file

@ -2,6 +2,7 @@
use Appwrite\Auth\Validator\Phone;
use Appwrite\Detector\Detector;
use Appwrite\Enum\MessageStatus;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Messaging;
@ -13,6 +14,7 @@ use Appwrite\Utopia\Database\Validator\CustomId;
use Appwrite\Utopia\Database\Validator\Queries\Messages;
use Appwrite\Utopia\Database\Validator\Queries\Providers;
use Appwrite\Utopia\Database\Validator\Queries\Subscribers;
use Appwrite\Utopia\Database\Validator\Queries\Targets;
use Appwrite\Utopia\Database\Validator\Queries\Topics;
use Appwrite\Utopia\Response;
use Utopia\App;
@ -35,6 +37,7 @@ use Utopia\Validator\Integer;
use Utopia\Validator\JSON;
use Utopia\Validator\Text;
use MaxMind\Db\Reader;
use Utopia\Database\DateTime;
use Utopia\Validator\WhiteList;
use function Swoole\Coroutine\batch;
@ -601,7 +604,7 @@ App::post('/v1/messaging/providers/fcm')
->label('scope', 'providers.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'createFcmProvider')
->label('sdk.method', 'createFCMProvider')
->label('sdk.description', '/docs/references/messaging/create-fcm-provider.md')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -660,7 +663,7 @@ App::post('/v1/messaging/providers/apns')
->label('scope', 'providers.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'createApnsProvider')
->label('sdk.method', 'createAPNSProvider')
->label('sdk.description', '/docs/references/messaging/create-apns-provider.md')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -1486,7 +1489,7 @@ App::patch('/v1/messaging/providers/fcm/:providerId')
->label('scope', 'providers.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'updateFcmProvider')
->label('sdk.method', 'updateFCMProvider')
->label('sdk.description', '/docs/references/messaging/update-fcm-provider.md')
->label('sdk.response.code', Response::STATUS_CODE_OK)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -1547,7 +1550,7 @@ App::patch('/v1/messaging/providers/apns/:providerId')
->label('scope', 'providers.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'updateApnsProvider')
->label('sdk.method', 'updateAPNSProvider')
->label('sdk.description', '/docs/references/messaging/update-apns-provider.md')
->label('sdk.response.code', Response::STATUS_CODE_OK)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -1976,19 +1979,27 @@ App::post('/v1/messaging/topics/:topicId/subscribers')
$user = Authorization::skip(fn () => $dbForProject->getDocument('users', $target->getAttribute('userId')));
$userId = $user->getId();
$subscriber = new Document([
'$id' => $subscriberId,
'$permissions' => [
Permission::read(Role::user($user->getId())),
Permission::delete(Role::user($user->getId())),
Permission::read(Role::user($userId)),
Permission::delete(Role::user($userId)),
],
'topicId' => $topicId,
'topicInternalId' => $topic->getInternalId(),
'targetId' => $targetId,
'targetInternalId' => $target->getInternalId(),
'userId' => $user->getId(),
'userId' => $userId,
'userInternalId' => $user->getInternalId(),
'providerType' => $target->getAttribute('providerType'),
'search' => implode(' ', [
$subscriberId,
$targetId,
$userId,
$target->getAttribute('providerType'),
]),
]);
try {
@ -2253,7 +2264,7 @@ App::post('/v1/messaging/messages/email')
->label('scope', 'messages.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'createEmailMessage')
->label('sdk.method', 'createEmail')
->label('sdk.description', '/docs/references/messaging/create-email.md')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -2267,23 +2278,28 @@ App::post('/v1/messaging/messages/email')
->param('cc', [], new ArrayList(new UID()), 'Array of target IDs to be added as CC.', true)
->param('bcc', [], new ArrayList(new UID()), 'Array of target IDs to be added as BCC.', true)
->param('description', '', new Text(256), 'Description for message.', true)
->param('status', 'processing', new WhiteList(['draft', 'canceled', 'processing']), 'Message Status. Value must be either draft or cancelled or processing.', true)
->param('status', MessageStatus::DRAFT, new WhiteList([MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]), 'Message Status. Value must be one of: ' . implode(', ', [MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]) . '.', true)
->param('html', false, new Boolean(), 'Is content of type HTML', true)
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, string $subject, string $content, array $topics, array $users, array $targets, array $cc, array $bcc, string $description, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, string $subject, string $content, array $topics, array $users, array $targets, array $cc, array $bcc, string $description, string $status, bool $html, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$messageId = $messageId == 'unique()'
? ID::unique()
: $messageId;
if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
if ($status !== MessageStatus::DRAFT && \count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
throw new Exception(Exception::MESSAGE_MISSING_TARGET);
}
if ($status === MessageStatus::SCHEDULED && \is_null($scheduledAt)) {
throw new Exception(Exception::MESSAGE_MISSING_SCHEDULE);
}
$mergedTargets = \array_merge($targets, $cc, $bcc);
if (!empty($mergedTargets)) {
@ -2311,6 +2327,7 @@ App::post('/v1/messaging/messages/email')
'users' => $users,
'targets' => $targets,
'description' => $description,
'scheduledAt' => $scheduledAt,
'data' => [
'subject' => $subject,
'content' => $content,
@ -2321,11 +2338,35 @@ App::post('/v1/messaging/messages/email')
'status' => $status,
]));
if ($status === 'processing') {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
switch ($status) {
case MessageStatus::PROCESSING:
$queueForMessaging
->setMessageId($message->getId())
->trigger();
break;
case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => true,
]));
$message->setAttribute('scheduleId', $schedule->getId());
$dbForProject->updateDocument(
'messages',
$message->getId(),
$message
);
break;
default:
break;
}
$queueForEvents
@ -2345,7 +2386,7 @@ App::post('/v1/messaging/messages/sms')
->label('scope', 'messages.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'createSMSMessage')
->label('sdk.method', 'createSMS')
->label('sdk.description', '/docs/references/messaging/create-sms.md')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -2356,35 +2397,42 @@ App::post('/v1/messaging/messages/sms')
->param('users', [], new ArrayList(new UID()), 'List of User IDs.', true)
->param('targets', [], new ArrayList(new UID()), 'List of Targets IDs.', true)
->param('description', '', new Text(256), 'Description for Message.', true)
->param('status', 'processing', new WhiteList(['draft', 'canceled', 'processing']), 'Message Status. Value must be either draft or cancelled or processing.', true)
->param('status', MessageStatus::DRAFT, new WhiteList([MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]), 'Message Status. Value must be one of: ' . implode(', ', [MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]) . '.', true)
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, string $content, array $topics, array $users, array $targets, string $description, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, string $content, array $topics, array $users, array $targets, string $description, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$messageId = $messageId == 'unique()'
? ID::unique()
: $messageId;
if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
if ($status !== MessageStatus::DRAFT && \count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
throw new Exception(Exception::MESSAGE_MISSING_TARGET);
}
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_SMS]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_SMS);
if ($status === MessageStatus::SCHEDULED && \is_null($scheduledAt)) {
throw new Exception(Exception::MESSAGE_MISSING_SCHEDULE);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
if (!empty($targets)) {
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_SMS]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_SMS);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
}
}
}
@ -2401,11 +2449,35 @@ App::post('/v1/messaging/messages/sms')
'status' => $status,
]));
if ($status === 'processing') {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
switch ($status) {
case MessageStatus::PROCESSING:
$queueForMessaging
->setMessageId($message->getId())
->trigger();
break;
case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => true,
]));
$message->setAttribute('scheduleId', $schedule->getId());
$dbForProject->updateDocument(
'messages',
$message->getId(),
$message
);
break;
default:
break;
}
$queueForEvents
@ -2425,7 +2497,7 @@ App::post('/v1/messaging/messages/push')
->label('scope', 'messages.write')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'createPushMessage')
->label('sdk.method', 'createPush')
->label('sdk.description', '/docs/references/messaging/create-push-notification.md')
->label('sdk.response.code', Response::STATUS_CODE_CREATED)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
@ -2444,35 +2516,42 @@ App::post('/v1/messaging/messages/push')
->param('color', '', new Text(256), 'Color for push notification. Available only for Android Platform.', true)
->param('tag', '', new Text(256), 'Tag for push notification. Available only for Android Platform.', true)
->param('badge', '', new Text(256), 'Badge for push notification. Available only for IOS Platform.', true)
->param('status', 'processing', new WhiteList(['draft', 'canceled', 'processing']), 'Message Status. Value must be either draft or cancelled or processing.', true)
->param('status', MessageStatus::DRAFT, new WhiteList([MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]), 'Message Status. Value must be one of: ' . implode(', ', [MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]) . '.', true)
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, string $title, string $body, array $topics, array $users, array $targets, string $description, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, string $title, string $body, array $topics, array $users, array $targets, string $description, ?array $data, string $action, string $icon, string $sound, string $color, string $tag, string $badge, string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$messageId = $messageId == 'unique()'
? ID::unique()
: $messageId;
if (\count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
if ($status !== MessageStatus::DRAFT && \count($topics) === 0 && \count($users) === 0 && \count($targets) === 0) {
throw new Exception(Exception::MESSAGE_MISSING_TARGET);
}
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_PUSH]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_PUSH);
if ($status === MessageStatus::SCHEDULED && \is_null($scheduledAt)) {
throw new Exception(Exception::MESSAGE_MISSING_SCHEDULE);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
if (!empty($targets)) {
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_PUSH]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_PUSH);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
}
}
}
@ -2498,11 +2577,35 @@ App::post('/v1/messaging/messages/push')
'status' => $status,
]));
if ($status === 'processing') {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
switch ($status) {
case MessageStatus::PROCESSING:
$queueForMessaging
->setMessageId($message->getId())
->trigger();
break;
case MessageStatus::SCHEDULED:
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => true,
]));
$message->setAttribute('scheduleId', $schedule->getId());
$dbForProject->updateDocument(
'messages',
$message->getId(),
$message
);
break;
default:
break;
}
$queueForEvents
@ -2524,7 +2627,7 @@ App::get('/v1/messaging/messages')
->label('sdk.response.code', Response::STATUS_CODE_OK)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
->label('sdk.response.model', Response::MODEL_MESSAGE_LIST)
->param('queries', [], new Messages(), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long. You may filter on the following attributes: ' . implode(', ', Providers::ALLOWED_ATTRIBUTES), true)
->param('queries', [], new Messages(), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long. You may filter on the following attributes: ' . implode(', ', Messages::ALLOWED_ATTRIBUTES), true)
->param('search', '', new Text(256), 'Search term to filter your list results. Max length: 256 chars.', true)
->inject('dbForProject')
->inject('response')
@ -2640,6 +2743,65 @@ App::get('/v1/messaging/messages/:messageId/logs')
]), Response::MODEL_LOG_LIST);
});
App::get('/v1/messaging/messages/:messageId/targets')
->desc('List message targets')
->groups(['api', 'messaging'])
->label('scope', 'messages.read')
->label('sdk.auth', [APP_AUTH_TYPE_ADMIN, APP_AUTH_TYPE_KEY])
->label('sdk.namespace', 'messaging')
->label('sdk.method', 'listTargets')
->label('sdk.description', '/docs/references/messaging/list-message-targets.md')
->label('sdk.response.code', Response::STATUS_CODE_OK)
->label('sdk.response.type', Response::CONTENT_TYPE_JSON)
->label('sdk.response.model', Response::MODEL_TARGET_LIST)
->param('messageId', '', new UID(), 'Message ID.')
->param('queries', [], new Targets(), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long. You may filter on the following attributes: ' . implode(', ', Targets::ALLOWED_ATTRIBUTES), true)
->inject('response')
->inject('dbForProject')
->inject('locale')
->inject('geodb')
->action(function (string $messageId, array $queries, Response $response, Database $dbForProject, Locale $locale, Reader $geodb) {
$message = $dbForProject->getDocument('messages', $messageId);
if ($message->isEmpty()) {
throw new Exception(Exception::MESSAGE_NOT_FOUND);
}
$targetIDs = $message->getAttribute('targets');
if (empty($targetIDs)) {
$response->dynamic(new Document([
'targets' => [],
'total' => 0,
]), Response::MODEL_TARGET_LIST);
return;
}
$queries = Query::parseQueries($queries);
$queries[] = Query::equal('$id', $targetIDs);
// Get cursor document if there was a cursor query
$cursor = Query::getByType($queries, [Query::TYPE_CURSORAFTER, Query::TYPE_CURSORBEFORE]);
$cursor = reset($cursor);
if ($cursor) {
$targetId = $cursor->getValue();
$cursorDocument = $dbForProject->getDocument('targets', $targetId);
if ($cursorDocument->isEmpty()) {
throw new Exception(Exception::GENERAL_CURSOR_NOT_FOUND, "Target '{$targetId}' for the 'cursor' value not found.");
}
$cursor->setValue($cursorDocument);
}
$response->dynamic(new Document([
'targets' => $dbForProject->find('targets', $queries),
'total' => $dbForProject->count('targets', $queries, APP_LIMIT_COUNT),
]), Response::MODEL_TARGET_LIST);
});
App::get('/v1/messaging/messages/:messageId')
->desc('Get a message')
->groups(['api', 'messaging'])
@ -2685,24 +2847,25 @@ App::patch('/v1/messaging/messages/email/:messageId')
->param('subject', null, new Text(998), 'Email Subject.', true)
->param('description', null, new Text(256), 'Description for Message.', true)
->param('content', null, new Text(64230), 'Email Content.', true)
->param('status', null, new WhiteList(['draft', 'cancelled', 'processing']), 'Message Status. Value must be either draft or cancelled or processing.', true)
->param('status', MessageStatus::DRAFT, new WhiteList([MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]), 'Message Status. Value must be one of: ' . implode(', ', [MessageStatus::DRAFT, MessageStatus::SCHEDULED, MessageStatus::PROCESSING]) . '.', true)
->param('html', null, new Boolean(), 'Is content of type HTML', true)
->param('cc', null, new ArrayList(new UID()), 'Array of target IDs to be added as CC.', true)
->param('bcc', null, new ArrayList(new UID()), 'Array of target IDs to be added as BCC.', true)
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $subject, ?string $description, ?string $content, ?string $status, ?bool $html, ?array $cc, ?array $bcc, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $subject, ?string $description, ?string $content, ?string $status, ?bool $html, ?array $cc, ?array $bcc, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$message = $dbForProject->getDocument('messages', $messageId);
if ($message->isEmpty()) {
throw new Exception(Exception::MESSAGE_NOT_FOUND);
}
if ($message->getAttribute('status') === 'sent') {
if ($message->getAttribute('status') === MessageStatus::SENT) {
throw new Exception(Exception::MESSAGE_ALREADY_SENT);
}
@ -2718,30 +2881,12 @@ App::patch('/v1/messaging/messages/email/:messageId')
$message->setAttribute('users', $users);
}
if (!\is_null($targets) || !\is_null($cc) || !\is_null($bcc)) {
$mergedTargets = \array_merge(...\array_filter([$targets, $cc, $bcc]));
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $mergedTargets),
Query::equal('providerType', [MESSAGE_TYPE_EMAIL]),
Query::limit(\count($mergedTargets)),
]);
if (\count($foundTargets) !== \count($mergedTargets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_EMAIL);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
}
}
}
$data = $message->getAttribute('data');
if (!\is_null($targets)) {
$message->setAttribute('targets', $targets);
}
$data = $message->getAttribute('data');
if (!\is_null($subject)) {
$data['subject'] = $subject;
}
@ -2772,16 +2917,44 @@ App::patch('/v1/messaging/messages/email/:messageId')
$message->setAttribute('status', $status);
}
if (!is_null($scheduledAt)) {
$message->setAttribute('scheduledAt', $scheduledAt);
if (!\is_null($scheduledAt)) {
if (\is_null($message->getAttribute(('scheduleId')))) {
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => $status === 'processing',
]));
$message->setAttribute('scheduleId', $schedule->getId());
} else {
$schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId'));
if ($schedule->isEmpty()) {
throw new Exception(Exception::SCHEDULE_NOT_FOUND);
}
$schedule
->setAttribute('resourceUpdatedAt', DateTime::now())
->setAttribute('schedule', $scheduledAt)
->setAttribute('active', $status === 'processing');
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
}
$message->setAttribute('scheduleId', $schedule->getId());
}
$message = $dbForProject->updateDocument('messages', $message->getId(), $message);
if ($status === 'processing') {
if ($status === MessageStatus::PROCESSING) {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
}
@ -2816,10 +2989,11 @@ App::patch('/v1/messaging/messages/sms/:messageId')
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $description, ?string $content, ?string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $description, ?string $content, ?string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$message = $dbForProject->getDocument('messages', $messageId);
if ($message->isEmpty()) {
@ -2843,22 +3017,6 @@ App::patch('/v1/messaging/messages/sms/:messageId')
}
if (!\is_null($targets)) {
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_SMS]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_SMS);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
}
}
$message->setAttribute('targets', $targets);
}
@ -2878,16 +3036,44 @@ App::patch('/v1/messaging/messages/sms/:messageId')
$message->setAttribute('description', $description);
}
if (!is_null($scheduledAt)) {
$message->setAttribute('scheduledAt', $scheduledAt);
if (!\is_null($scheduledAt)) {
if (\is_null($message->getAttribute(('scheduleId')))) {
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => $status === 'processing',
]));
$message->setAttribute('scheduleId', $schedule->getId());
} else {
$schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId'));
if ($schedule->isEmpty()) {
throw new Exception(Exception::SCHEDULE_NOT_FOUND);
}
$schedule
->setAttribute('resourceUpdatedAt', DateTime::now())
->setAttribute('schedule', $scheduledAt)
->setAttribute('active', $status === 'processing');
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
}
$message->setAttribute('scheduleId', $schedule->getId());
}
$message = $dbForProject->updateDocument('messages', $message->getId(), $message);
if ($status === 'processing') {
if ($status === 'processing' && \is_null($message->getAttribute('scheduledAt'))) {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
}
@ -2930,10 +3116,11 @@ App::patch('/v1/messaging/messages/push/:messageId')
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled delivery time for message in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('queueForEvents')
->inject('dbForProject')
->inject('dbForConsole')
->inject('project')
->inject('queueForMessaging')
->inject('response')
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $description, ?string $title, ?string $body, ?array $data, ?string $action, ?string $icon, ?string $sound, ?string $color, ?string $tag, ?int $badge, ?string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Document $project, Messaging $queueForMessaging, Response $response) {
->action(function (string $messageId, ?array $topics, ?array $users, ?array $targets, ?string $description, ?string $title, ?string $body, ?array $data, ?string $action, ?string $icon, ?string $sound, ?string $color, ?string $tag, ?int $badge, ?string $status, ?string $scheduledAt, Event $queueForEvents, Database $dbForProject, Database $dbForConsole, Document $project, Messaging $queueForMessaging, Response $response) {
$message = $dbForProject->getDocument('messages', $messageId);
if ($message->isEmpty()) {
@ -2957,22 +3144,6 @@ App::patch('/v1/messaging/messages/push/:messageId')
}
if (!\is_null($targets)) {
$foundTargets = $dbForProject->find('targets', [
Query::equal('$id', $targets),
Query::equal('providerType', [MESSAGE_TYPE_PUSH]),
Query::limit(\count($targets)),
]);
if (\count($foundTargets) !== \count($targets)) {
throw new Exception(Exception::MESSAGE_TARGET_NOT_PUSH);
}
foreach ($foundTargets as $target) {
if ($target->isEmpty()) {
throw new Exception(Exception::USER_TARGET_NOT_FOUND);
}
}
$message->setAttribute('targets', $targets);
}
@ -3025,15 +3196,43 @@ App::patch('/v1/messaging/messages/push/:messageId')
}
if (!\is_null($scheduledAt)) {
$message->setAttribute('scheduledAt', $scheduledAt);
if (\is_null($message->getAttribute(('scheduleId')))) {
$schedule = $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION', 'default'),
'resourceType' => 'message',
'resourceCollection' => 'messages',
'resourceId' => $message->getId(),
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => $status === 'processing',
]));
$message->setAttribute('scheduleId', $schedule->getId());
} else {
$schedule = $dbForConsole->getDocument('schedules', $message->getAttribute('scheduleId'));
if ($schedule->isEmpty()) {
throw new Exception(Exception::SCHEDULE_NOT_FOUND);
}
$schedule
->setAttribute('resourceUpdatedAt', DateTime::now())
->setAttribute('schedule', $scheduledAt)
->setAttribute('active', $status === 'processing');
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
}
$message->setAttribute('scheduleId', $schedule->getId());
}
$message = $dbForProject->updateDocument('messages', $message->getId(), $message);
if ($status === 'processing') {
if ($status === 'processing' && \is_null($message->getAttribute('scheduledAt'))) {
$queueForMessaging
->setMessageId($message->getId())
->setProject($project)
->trigger();
}

View file

@ -654,7 +654,6 @@ App::post('/v1/teams/:teamId/memberships')
->setMessage($messageDoc)
->setRecipients([$phone])
->setProviderType('SMS')
->setProject($project)
->trigger();
}
}

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Extend\Exception;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
@ -97,6 +98,7 @@ App::init()
->inject('project')
->inject('user')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
->inject('queueForDeletes')
->inject('queueForDatabase')
@ -104,7 +106,7 @@ App::init()
->inject('mode')
->inject('queueForMails')
->inject('usage')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Database $dbForProject, string $mode, Mail $queueForMails, Stats $usage) use ($databaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Database $dbForProject, string $mode, Mail $queueForMails, Stats $usage) use ($databaseListener) {
$route = $utopia->getRoute();
@ -181,6 +183,9 @@ App::init()
->setProject($project)
->setUser($user);
$queueForMessaging
->setProject($project);
$queueForAudits
->setMode($mode)
->setUserAgent($request->getUserAgent(''))

View file

@ -90,17 +90,17 @@ const APP_MODE_DEFAULT = 'default';
const APP_MODE_ADMIN = 'admin';
const APP_PAGING_LIMIT = 12;
const APP_LIMIT_COUNT = 5000;
const APP_LIMIT_USERS = 10000;
const APP_LIMIT_USERS = 10_000;
const APP_LIMIT_USER_PASSWORD_HISTORY = 20;
const APP_LIMIT_USER_SESSIONS_MAX = 100;
const APP_LIMIT_USER_SESSIONS_DEFAULT = 10;
const APP_LIMIT_ANTIVIRUS = 20000000; //20MB
const APP_LIMIT_ENCRYPTION = 20000000; //20MB
const APP_LIMIT_COMPRESSION = 20000000; //20MB
const APP_LIMIT_ANTIVIRUS = 20_000_000; //20MB
const APP_LIMIT_ENCRYPTION = 20_000_000; //20MB
const APP_LIMIT_COMPRESSION = 20_000_000; //20MB
const APP_LIMIT_ARRAY_PARAMS_SIZE = 100; // Default maximum of how many elements can there be in API parameter that expects array value
const APP_LIMIT_ARRAY_ELEMENT_SIZE = 4096; // Default maximum length of element in array parameter represented by maximum URL length.
const APP_LIMIT_SUBQUERY = 1000;
const APP_LIMIT_SUBSCRIBERS_SUBQUERY = 1000000;
const APP_LIMIT_SUBSCRIBERS_SUBQUERY = 1_000_000;
const APP_LIMIT_WRITE_RATE_DEFAULT = 60; // Default maximum write rate per rate period
const APP_LIMIT_WRITE_RATE_PERIOD_DEFAULT = 60; // Default maximum write rate period in seconds
const APP_LIMIT_LIST_DEFAULT = 25; // Default maximum number of items to return in list API calls
@ -116,8 +116,8 @@ const APP_DATABASE_ATTRIBUTE_DATETIME = 'datetime';
const APP_DATABASE_ATTRIBUTE_URL = 'url';
const APP_DATABASE_ATTRIBUTE_INT_RANGE = 'intRange';
const APP_DATABASE_ATTRIBUTE_FLOAT_RANGE = 'floatRange';
const APP_DATABASE_ATTRIBUTE_STRING_MAX_LENGTH = 1073741824; // 2^32 bits / 4 bits per char
const APP_DATABASE_TIMEOUT_MILLISECONDS = 15000;
const APP_DATABASE_ATTRIBUTE_STRING_MAX_LENGTH = 1_073_741_824; // 2^32 bits / 4 bits per char
const APP_DATABASE_TIMEOUT_MILLISECONDS = 15_000;
const APP_STORAGE_UPLOADS = '/storage/uploads';
const APP_STORAGE_FUNCTIONS = '/storage/functions';
const APP_STORAGE_BUILDS = '/storage/builds';
@ -172,6 +172,7 @@ const DELETE_TYPE_CACHE_BY_TIMESTAMP = 'cacheByTimeStamp';
const DELETE_TYPE_CACHE_BY_RESOURCE = 'cacheByResource';
const DELETE_TYPE_SCHEDULES = 'schedules';
const DELETE_TYPE_TOPIC = 'topic';
const DELETE_TYPE_TARGET = 'target';
// Mail Types
const MAIL_TYPE_VERIFICATION = 'verification';
const MAIL_TYPE_MAGIC_SESSION = 'magicSession';
@ -552,15 +553,16 @@ Database::addFilter(
},
function (mixed $value, Document $document, Database $database) {
$targetIds = Authorization::skip(fn () => \array_map(
fn ($document) => $document->getAttribute('targetId'),
$database
->find('subscribers', [
fn ($document) => $document->getAttribute('targetInternalId'),
$database->find('subscribers', [
Query::equal('topicInternalId', [$document->getInternalId()]),
Query::limit(APP_LIMIT_SUBSCRIBERS_SUBQUERY)
])
));
if (\count($targetIds) > 0) {
return $database->find('targets', [Query::equal('$id', $targetIds)]);
return $database->find('targets', [
Query::equal('$internalId', $targetIds)
]);
}
return [];
}

View file

@ -633,10 +633,35 @@ services:
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
appwrite-schedule:
appwrite-scheduler-functions:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: schedule
container_name: appwrite-schedule
entrypoint: schedule-functions
container_name: appwrite-scheduler-functions
<<: *x-logging
restart: unless-stopped
networks:
- appwrite
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
appwrite-scheduler-messages:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: schedule-messages
container_name: appwrite-scheduler-messages
<<: *x-logging
restart: unless-stopped
networks:

View file

@ -1,3 +0,0 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule $@

3
bin/schedule-functions Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule-functions $@

3
bin/schedule-messages Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule-messages $@

View file

@ -699,10 +699,37 @@ services:
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
appwrite-schedule:
entrypoint: schedule
appwrite-scheduler-functions:
entrypoint: schedule-functions
<<: *x-logging
container_name: appwrite-schedule
container_name: appwrite-scheduler-functions
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
appwrite-scheduler-messages:
entrypoint: schedule-messages
<<: *x-logging
container_name: appwrite-scheduler-messages
image: appwrite-dev
networks:
- appwrite

View file

@ -0,0 +1 @@
List the targets associated with a message as set via the targets attribute.

View file

@ -2,26 +2,26 @@
namespace Appwrite\Enum;
enum MessageStatus: string
class MessageStatus
{
/**
* Message that is not ready to be sent
*/
case Draft = 'draft';
public const DRAFT = 'draft';
/**
* Scheduled to be sent for a later time
*/
case Scheduled = 'scheduled';
public const SCHEDULED = 'scheduled';
/**
* Picked up by the worker and starting to send
*/
case Processing = 'processing';
public const PROCESSING = 'processing';
/**
* Sent without errors
*/
case Sent = 'sent';
public const SENT = 'sent';
/**
* Sent with some errors
*/
case Failed = 'failed';
public const FAILED = 'failed';
}

View file

@ -262,6 +262,10 @@ class Exception extends \Exception
public const MESSAGE_TARGET_NOT_EMAIL = 'message_target_not_email';
public const MESSAGE_TARGET_NOT_SMS = 'message_target_not_sms';
public const MESSAGE_TARGET_NOT_PUSH = 'message_target_not_push';
public const MESSAGE_MISSING_SCHEDULE = 'message_missing_schedule';
/** Schedules */
public const SCHEDULE_NOT_FOUND = 'schedule_not_found';
protected string $type = '';

View file

@ -2,26 +2,27 @@
namespace Appwrite\Platform\Services;
use Utopia\Platform\Service;
use Appwrite\Platform\Tasks\CalcTierStats;
use Appwrite\Platform\Tasks\DeleteOrphanedProjects;
use Appwrite\Platform\Tasks\DevGenerateTranslations;
use Appwrite\Platform\Tasks\Doctor;
use Appwrite\Platform\Tasks\GetMigrationStats;
use Appwrite\Platform\Tasks\Hamster;
use Appwrite\Platform\Tasks\Install;
use Appwrite\Platform\Tasks\Maintenance;
use Appwrite\Platform\Tasks\Migrate;
use Appwrite\Platform\Tasks\Schedule;
use Appwrite\Platform\Tasks\PatchRecreateRepositoriesDocuments;
use Appwrite\Platform\Tasks\SDKs;
use Appwrite\Platform\Tasks\Specs;
use Appwrite\Platform\Tasks\SSL;
use Appwrite\Platform\Tasks\Hamster;
use Appwrite\Platform\Tasks\ScheduleFunctions;
use Appwrite\Platform\Tasks\ScheduleMessages;
use Appwrite\Platform\Tasks\Specs;
use Appwrite\Platform\Tasks\Upgrade;
use Appwrite\Platform\Tasks\Usage;
use Appwrite\Platform\Tasks\Vars;
use Appwrite\Platform\Tasks\Version;
use Appwrite\Platform\Tasks\VolumeSync;
use Appwrite\Platform\Tasks\CalcTierStats;
use Appwrite\Platform\Tasks\Upgrade;
use Appwrite\Platform\Tasks\DeleteOrphanedProjects;
use Appwrite\Platform\Tasks\DevGenerateTranslations;
use Appwrite\Platform\Tasks\GetMigrationStats;
use Appwrite\Platform\Tasks\PatchRecreateRepositoriesDocuments;
use Utopia\Platform\Service;
class Tasks extends Service
{
@ -29,25 +30,26 @@ class Tasks extends Service
{
$this->type = self::TYPE_CLI;
$this
->addAction(Version::getName(), new Version())
->addAction(Usage::getName(), new Usage())
->addAction(Vars::getName(), new Vars())
->addAction(SSL::getName(), new SSL())
->addAction(Hamster::getName(), new Hamster())
->addAction(Doctor::getName(), new Doctor())
->addAction(Install::getName(), new Install())
->addAction(Upgrade::getName(), new Upgrade())
->addAction(Maintenance::getName(), new Maintenance())
->addAction(Schedule::getName(), new Schedule())
->addAction(Migrate::getName(), new Migrate())
->addAction(SDKs::getName(), new SDKs())
->addAction(VolumeSync::getName(), new VolumeSync())
->addAction(Specs::getName(), new Specs())
->addAction(CalcTierStats::getName(), new CalcTierStats())
->addAction(DeleteOrphanedProjects::getName(), new DeleteOrphanedProjects())
->addAction(PatchRecreateRepositoriesDocuments::getName(), new PatchRecreateRepositoriesDocuments())
->addAction(GetMigrationStats::getName(), new GetMigrationStats())
->addAction(DevGenerateTranslations::getName(), new DevGenerateTranslations())
->addAction(Doctor::getName(), new Doctor())
->addAction(GetMigrationStats::getName(), new GetMigrationStats())
->addAction(Hamster::getName(), new Hamster())
->addAction(Install::getName(), new Install())
->addAction(Maintenance::getName(), new Maintenance())
->addAction(Migrate::getName(), new Migrate())
->addAction(PatchRecreateRepositoriesDocuments::getName(), new PatchRecreateRepositoriesDocuments())
->addAction(SDKs::getName(), new SDKs())
->addAction(SSL::getName(), new SSL())
->addAction(ScheduleFunctions::getName(), new ScheduleFunctions())
->addAction(ScheduleMessages::getName(), new ScheduleMessages())
->addAction(Specs::getName(), new Specs())
->addAction(Upgrade::getName(), new Upgrade())
->addAction(Usage::getName(), new Usage())
->addAction(Vars::getName(), new Vars())
->addAction(Version::getName(), new Version())
->addAction(VolumeSync::getName(), new VolumeSync())
;
}

View file

@ -22,16 +22,16 @@ class Workers extends Service
$this->type = self::TYPE_WORKER;
$this
->addAction(Audits::getName(), new Audits())
->addAction(Webhooks::getName(), new Webhooks())
->addAction(Mails::getName(), new Mails())
->addAction(Messaging::getName(), new Messaging())
->addAction(Builds::getName(), new Builds())
->addAction(Certificates::getName(), new Certificates())
->addAction(Databases::getName(), new Databases())
->addAction(Functions::getName(), new Functions())
->addAction(Builds::getName(), new Builds())
->addAction(Deletes::getName(), new Deletes())
->addAction(Migrations::getName(), new Migrations())
->addAction(Functions::getName(), new Functions())
->addAction(Hamster::getName(), new Hamster())
->addAction(Mails::getName(), new Mails())
->addAction(Messaging::getName(), new Messaging())
->addAction(Migrations::getName(), new Migrations())
->addAction(Webhooks::getName(), new Webhooks())
;
}

View file

@ -61,7 +61,7 @@ class Maintenance extends Action
private function notifyDeleteExecutionLogs(int $interval, Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_EXECUTIONS)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
@ -69,7 +69,7 @@ class Maintenance extends Action
private function notifyDeleteAbuseLogs(int $interval, Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_ABUSE)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
@ -77,7 +77,7 @@ class Maintenance extends Action
private function notifyDeleteAuditLogs(int $interval, Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_AUDIT)
->setDatetime(DateTime::addSeconds(new \DateTime(), -1 * $interval))
->trigger();
@ -85,7 +85,7 @@ class Maintenance extends Action
private function notifyDeleteUsageStats(int $usageStatsRetentionHourly, Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_USAGE)
->setUsageRetentionHourlyDateTime(DateTime::addSeconds(new \DateTime(), -1 * $usageStatsRetentionHourly))
->trigger();
@ -93,7 +93,7 @@ class Maintenance extends Action
private function notifyDeleteConnections(Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_REALTIME)
->setDatetime(DateTime::addSeconds(new \DateTime(), -60))
->trigger();
@ -101,7 +101,7 @@ class Maintenance extends Action
private function notifyDeleteExpiredSessions(Delete $queueForDeletes): void
{
($queueForDeletes)
$queueForDeletes
->setType(DELETE_TYPE_SESSIONS)
->trigger();
}

View file

@ -1,244 +0,0 @@
<?php
namespace Appwrite\Platform\Tasks;
use Cron\CronExpression;
use Swoole\Timer;
use Utopia\App;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Func;
use function Swoole\Coroutine\run;
class Schedule extends Action
{
public const FUNCTION_UPDATE_TIMER = 10; //seconds
public const FUNCTION_ENQUEUE_TIMER = 60; //seconds
public static function getName(): string
{
return 'schedule';
}
public function __construct()
{
$this
->desc('Execute functions scheduled in Appwrite')
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
* 1. Load all documents from 'schedules' collection to create local copy
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started');
/**
* Extract only nessessary attributes to lower memory used.
*
* @var Document $schedule
* @return array
*/
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
$function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'));
return [
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
];
};
$schedules = []; // Local copy of 'schedules' collection
$lastSyncUpdate = DateTime::now();
$limit = 10000;
$sum = $limit;
$total = 0;
$loadStart = \microtime(true);
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', ['function']),
Query::equal('active', [true]),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
try {
$schedules[$document['resourceId']] = $getSchedule($document);
} catch (\Throwable $th) {
Console::error("Failed to load schedule for project {$document['projectId']} and function {$document['resourceId']}");
Console::error($th->getMessage());
}
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$pools->reclaim();
Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
Console::log("Sync tick: Running at $time");
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', ['function']),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null;
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) {
Console::info("Removing: {$document['resourceId']}");
unset($schedules[$document['resourceId']]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceId']}");
$schedules[$document['resourceId']] = $getSchedule($document);
}
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
/**
* The timer to prepare soon-to-execute schedules.
*/
$lastEnqueueUpdate = null;
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) {
$timerStart = \microtime(true);
$time = DateTime::now();
$enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$total = 0;
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $key;
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!isset($schedules[$scheduleKey])) {
return;
}
$schedule = $schedules[$scheduleKey];
$functions = new Func($connection);
$functions
->setType('schedule')
->setFunction($schedule['function'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}
$timerEnd = \microtime(true);
$lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
};
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
$enqueueFunctions();
}
);
}
}

View file

@ -0,0 +1,190 @@
<?php
namespace Appwrite\Platform\Tasks;
use Cron\CronExpression;
use Swoole\Timer;
use Utopia\App;
use Utopia\Database\Exception;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Func;
use function Swoole\Coroutine\run;
abstract class ScheduleBase extends Action
{
protected const UPDATE_TIMER = 10; //seconds
protected const ENQUEUE_TIMER = 60; //seconds
protected array $schedules = [];
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract protected function enqueueResources(
Group $pools,
Database $dbForConsole
);
public function __construct()
{
$type = static::getSupportedResource();
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn(Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
* 1. Load all documents from 'schedules' collection to create local copy
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
/**
* Extract only necessary attributes to lower memory used.
*
* @return array
* @throws Exception
* @var Document $schedule
*/
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
$resource = $getProjectDB($project)->getDocument(
$schedule->getAttribute('resourceCollection'),
$schedule->getAttribute('resourceId')
);
return [
'$id' => $schedule->getId(),
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
];
};
$lastSyncUpdate = DateTime::now();
$limit = 10_000;
$sum = $limit;
$total = 0;
$loadStart = \microtime(true);
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::equal('active', [true]),
]));
$sum = \count($results);
$total = $total + $sum;
foreach ($results as $document) {
try {
$this->schedules[$document['resourceId']] = $getSchedule($document);
} catch (\Throwable $th) {
Console::error("Failed to load schedule for project {$document['projectId']} {$document['resourceCollection']} {$document['resourceId']}");
Console::error($th->getMessage());
}
}
$latestDocument = \end($results);
}
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
Console::log("Sync tick: Running at $time");
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null;
// Check if resource has been updated since last sync
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
$new = \strtotime($document['resourceUpdatedAt']);
if (!$document['active']) {
Console::info("Removing: {$document['resourceId']}");
unset($this->schedules[$document['resourceId']]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceId']}");
$this->schedules[$document['resourceId']] = $getSchedule($document);
}
}
$latestDocument = \end($results);
}
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn() => $this->enqueueResources($pools, $dbForConsole)
);
$this->enqueueResources($pools, $dbForConsole);
});
}
}

View file

@ -0,0 +1,104 @@
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Cron\CronExpression;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
class ScheduleFunctions extends ScheduleBase
{
public const UPDATE_TIMER = 10; // seconds
public const ENQUEUE_TIMER = 60; // seconds
private ?float $lastEnqueueUpdate = null;
public static function getName(): string
{
return 'schedule-functions';
}
public static function getSupportedResource(): string
{
return 'function';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
$enqueueDiff = $this->lastEnqueueUpdate === null ? 0 : $timerStart - $this->lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), static::ENQUEUE_TIMER - $enqueueDiff);
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$total = 0;
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $key;
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
return;
}
$schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($connection);
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}
$timerEnd = \microtime(true);
// TODO: This was a bug before because it wasn't passed by reference, enabling it breaks scheduling
//$this->lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
}
}

View file

@ -0,0 +1,71 @@
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Delete;
use Swoole\Timer;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Messaging;
use function Swoole\Coroutine\run;
class ScheduleMessages extends ScheduleBase
{
public const UPDATE_TIMER = 10; // seconds
public const ENQUEUE_TIMER = 60; // seconds
public static function getName(): string
{
return 'schedule-messages';
}
public static function getSupportedResource(): string
{
return 'message';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
{
foreach ($this->schedules as $schedule) {
$now = DateTime::now();
$scheduledAt = DateTime::formatTz($schedule['schedule']);
if ($scheduledAt > $now) {
continue;
}
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$queueForDeletes = new Delete($connection);
$queueForMessaging
->setMessageId($schedule['resourceId'])
->setProject($schedule['project'])
->trigger();
$dbForConsole->updateDocument(
'schedules',
$schedule['$id'],
new Document(['active' => false])
);
$queueForDeletes
->setType(DELETE_TYPE_SCHEDULES)
->setDocument($schedule)
->trigger();
$queue->reclaim();
unset($this->schedules[$schedule['resourceId']]);
});
}
}
}

View file

@ -20,6 +20,7 @@ use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Exception as DatabaseException;
use Utopia\Database\Query;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
@ -153,11 +154,14 @@ class Deletes extends Action
$this->deleteCacheByDate($project, $getProjectDB, $datetime);
break;
case DELETE_TYPE_SCHEDULES:
$this->deleteSchedules($dbForConsole, $getProjectDB, $datetime);
$this->deleteSchedules($dbForConsole, $getProjectDB, $datetime, $document);
break;
case DELETE_TYPE_TOPIC:
$this->deleteTopic($project, $getProjectDB, $document);
break;
case DELETE_TYPE_TARGET:
$this->deleteTarget($project, $getProjectDB, $document);
break;
default:
throw new \Exception('No delete operation for type: ' . \strval($type));
break;
@ -168,17 +172,21 @@ class Deletes extends Action
* @param Database $dbForConsole
* @param callable $getProjectDB
* @param string $datetime
* @param Document|null $document
* @return void
* @throws Authorization
* @throws Throwable
* @throws Conflict
* @throws Restricted
* @throws Structure
* @throws DatabaseException
*/
private function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime): void
private function deleteSchedules(Database $dbForConsole, callable $getProjectDB, string $datetime, ?Document $document = null): void
{
$this->listByGroup(
'schedules',
[
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', ['function']),
Query::equal('resourceType', [$document->getAttribute('resourceType')]),
Query::lessThanEqual('resourceUpdatedAt', $datetime),
Query::equal('active', [false]),
],
@ -192,11 +200,22 @@ class Deletes extends Action
return;
}
$function = $getProjectDB($project)->getDocument('functions', $document->getAttribute('resourceId'));
$resource = $getProjectDB($project)->getDocument(
$document->getAttribute('resourceCollection'),
$document->getAttribute('resourceId')
);
if ($function->isEmpty()) {
$delete = true;
switch ($document->getAttribute('resourceType')) {
case 'function':
$delete = $resource->isEmpty();
break;
}
if ($delete) {
$dbForConsole->deleteDocument('schedules', $document->getId());
Console::success('Deleting schedule for function ' . $document->getAttribute('resourceId'));
Console::success('Deleting schedule for ' . $document->getAttribute('resourceType') . ' ' . $document->getAttribute('resourceId'));
}
}
);
@ -221,6 +240,35 @@ class Deletes extends Action
], $dbForProject);
}
/**
* @param Document $project
* @param callable $getProjectDB
* @param Document $target
* @throws Exception
*/
protected function deleteTarget(Document $project, callable $getProjectDB, Document $target)
{
/** @var Database */
$dbForProject = $getProjectDB($project);
// Delete subscribers and decrement topic counts
$this->deleteByGroup(
'subscribers',
[
Query::equal('targetInternalId', [$target->getInternalId()])
],
$dbForProject,
function (Document $subscriber) use ($dbForProject) {
$topicId = $subscriber->getAttribute('topicId');
$topicInternalId = $subscriber->getAttribute('topicInternalId');
$topic = $dbForProject->getDocument('topics', $topicId);
if (!$topic->isEmpty() && $topic->getInternalId() === $topicInternalId) {
$dbForProject->decreaseDocumentAttribute('topics', $topicId, 'total', min: 0);
}
}
);
}
/**
* @param Document $project
* @param callable $getProjectDB
@ -563,9 +611,16 @@ class Deletes extends Action
], $dbForProject);
// Delete targets
$this->deleteByGroup('targets', [
Query::equal('userInternalId', [$userInternalId])
], $dbForProject);
$this->listByGroup(
'targets',
[
Query::equal('userInternalId', [$userInternalId])
],
$dbForProject,
function (Document $target) use ($getProjectDB, $project) {
$this->deleteTarget($project, $getProjectDB, $target);
}
);
}
/**

View file

@ -2,6 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Enum\MessageStatus;
use Appwrite\Extend\Exception;
use Utopia\App;
use Utopia\CLI\Console;
@ -38,7 +39,7 @@ class Messaging extends Action
{
public static function getName(): string
{
return "messaging";
return 'messaging';
}
/**
@ -69,10 +70,13 @@ class Messaging extends Action
throw new \Exception('Payload not found.');
}
if (!\is_null($payload['message']) && !\is_null($payload['recipients'])) {
if ($payload['providerType'] === MESSAGE_TYPE_SMS) {
$this->processInternalSMSMessage($log, new Document($payload['message']), $payload['recipients']);
}
if (
!\is_null($payload['message'])
&& !\is_null($payload['recipients'])
&& $payload['providerType'] === MESSAGE_TYPE_SMS
) {
// Message was triggered internally
$this->processInternalSMSMessage($log, new Document($payload['message']), $payload['recipients']);
} else {
$message = $dbForProject->getDocument('messages', $payload['messageId']);
@ -82,85 +86,124 @@ class Messaging extends Action
private function processMessage(Database $dbForProject, Document $message): void
{
$topicsId = $message->getAttribute('topics', []);
$targetsId = $message->getAttribute('targets', []);
$usersId = $message->getAttribute('users', []);
$topicIds = $message->getAttribute('topics', []);
$targetIds = $message->getAttribute('targets', []);
$userIds = $message->getAttribute('users', []);
/**
* @var Document[] $recipients
* @var array<Document> $recipients
*/
$recipients = [];
if (\count($topicsId) > 0) {
$topics = $dbForProject->find('topics', [Query::equal('$id', $topicsId)]);
if (\count($topicIds) > 0) {
$topics = $dbForProject->find('topics', [
Query::equal('$id', $topicIds),
Query::limit(\count($topicIds)),
]);
foreach ($topics as $topic) {
$targets = \array_filter($topic->getAttribute('targets'), fn(Document $target) => $target->getAttribute('providerType') === $message->getAttribute('providerType'));
$targets = \array_filter($topic->getAttribute('targets'), fn(Document $target) =>
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
$recipients = \array_merge($recipients, $targets);
}
}
if (\count($usersId) > 0) {
$users = $dbForProject->find('users', [Query::equal('$id', $usersId)]);
if (\count($userIds) > 0) {
$users = $dbForProject->find('users', [
Query::equal('$id', $userIds),
Query::limit(\count($userIds)),
]);
foreach ($users as $user) {
$targets = \array_filter($user->getAttribute('targets'), fn(Document $target) => $target->getAttribute('providerType') === $message->getAttribute('providerType'));
$targets = \array_filter($user->getAttribute('targets'), fn(Document $target) =>
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
$recipients = \array_merge($recipients, $targets);
}
}
if (\count($targetsId) > 0) {
$targets = $dbForProject->find('targets', [Query::equal('$id', $targetsId)]);
if (\count($targetIds) > 0) {
$targets = $dbForProject->find('targets', [
Query::equal('$id', $targetIds),
Query::limit(\count($targetIds)),
]);
$targets = \array_filter($targets, fn(Document $target) =>
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
$recipients = \array_merge($recipients, $targets);
}
$primaryProvider = $dbForProject->findOne('providers', [
if (empty($recipients)) {
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
'status' => MessageStatus::FAILED,
'deliveryErrors' => ['No valid recipients found.']
]));
Console::warning('No valid recipients found.');
return;
}
$fallback = $dbForProject->findOne('providers', [
Query::equal('enabled', [true]),
Query::equal('type', [$recipients[0]->getAttribute('providerType')]),
]);
if ($fallback === false || $fallback->isEmpty()) {
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
'status' => MessageStatus::FAILED,
'deliveryErrors' => ['No fallback provider found.']
]));
Console::warning('No fallback provider found.');
return;
}
/**
* @var array<string, array<string>> $identifiersByProviderId
* @var array<string, array<string>> $identifiers
*/
$identifiersByProviderId = [];
$identifiers = [];
/**
* @var Document[] $providers
*/
$providers = [
$primaryProvider->getId() => $primaryProvider
$fallback->getId() => $fallback
];
foreach ($recipients as $recipient) {
$providerId = $recipient->getAttribute('providerId');
if (!$providerId && $primaryProvider instanceof Document && !$primaryProvider->isEmpty()) {
$providerId = $primaryProvider->getId();
if (
!$providerId
&& $fallback instanceof Document
&& !$fallback->isEmpty()
&& $fallback->getAttribute('enabled')
) {
$providerId = $fallback->getId();
}
if ($providerId) {
if (!isset($identifiersByProviderId[$providerId])) {
$identifiersByProviderId[$providerId] = [];
if (!\array_key_exists($providerId, $identifiers)) {
$identifiers[$providerId] = [];
}
$identifiersByProviderId[$providerId][] = $recipient->getAttribute('identifier');
$identifiers[$providerId][] = $recipient->getAttribute('identifier');
}
}
/**
* @var array[] $results
* @var array<array> $results
*/
$results = batch(\array_map(function ($providerId) use ($identifiersByProviderId, $providers, $primaryProvider, $message, $dbForProject) {
return function () use ($providerId, $identifiersByProviderId, $providers, $primaryProvider, $message, $dbForProject) {
$results = batch(\array_map(function ($providerId) use ($identifiers, $providers, $fallback, $message, $dbForProject) {
return function () use ($providerId, $identifiers, $providers, $fallback, $message, $dbForProject) {
if (\array_key_exists($providerId, $providers)) {
$provider = $providers[$providerId];
} else {
$provider = $dbForProject->getDocument('providers', $providerId, [Query::equal('enabled', [true])]);
$provider = $dbForProject->getDocument('providers', $providerId);
if ($provider->isEmpty()) {
$provider = $primaryProvider;
if ($provider->isEmpty() || !$provider->getAttribute('enabled')) {
$provider = $fallback;
} else {
$providers[$providerId] = $provider;
}
}
$identifiers = $identifiersByProviderId[$providerId];
$identifiers = $identifiers[$providerId];
$adapter = match ($provider->getAttribute('type')) {
MESSAGE_TYPE_SMS => $this->sms($provider),
@ -200,7 +243,10 @@ class Messaging extends Action
// Deleting push targets when token has expired.
if ($detail['error'] === 'Expired device token.') {
$target = $dbForProject->findOne('targets', [Query::equal('identifier', [$detail['recipient']])]);
$target = $dbForProject->findOne('targets', [
Query::equal('identifier', [$detail['recipient']])
]);
if ($target instanceof Document && !$target->isEmpty()) {
$dbForProject->deleteDocument('targets', $target->getId());
}
@ -210,6 +256,7 @@ class Messaging extends Action
$deliveryErrors[] = 'Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage();
} finally {
$batchIndex++;
return [
'deliveredTotal' => $deliveredTotal,
'deliveryErrors' => $deliveryErrors,
@ -218,7 +265,7 @@ class Messaging extends Action
};
}, $batches));
};
}, \array_keys($identifiersByProviderId)));
}, \array_keys($identifiers)));
$results = array_merge(...$results);
@ -233,9 +280,9 @@ class Messaging extends Action
$message->setAttribute('deliveryErrors', $deliveryErrors);
if (\count($message->getAttribute('deliveryErrors')) > 0) {
$message->setAttribute('status', 'failed');
$message->setAttribute('status', MessageStatus::FAILED);
} else {
$message->setAttribute('status', 'sent');
$message->setAttribute('status', MessageStatus::SENT);
}
$message->removeAttribute('to');
@ -253,7 +300,7 @@ class Messaging extends Action
private function processInternalSMSMessage(Log $log, Document $message, array $recipients): void
{
if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) {
throw new \Exception('Skipped SMS processing. No Phone configuration has been set.');
throw new \Exception('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
}
$smsDSN = new DSN(App::getEnv('_APP_SMS_PROVIDER'));
@ -384,14 +431,20 @@ class Messaging extends Action
$bcc = [];
if (\count($ccTargets) > 0) {
$ccTargets = $dbForProject->find('targets', [Query::equal('identifier', $ccTargets)]);
$ccTargets = $dbForProject->find('targets', [
Query::equal('$id', $ccTargets),
Query::limit(\count($ccTargets)),
]);
foreach ($ccTargets as $ccTarget) {
$cc[] = ['email' => $ccTarget['identifier']];
}
}
if (\count($bccTargets) > 0) {
$bccTargets = $dbForProject->find('targets', [Query::equal('identifier', $bccTargets)]);
$bccTargets = $dbForProject->find('targets', [
Query::equal('$id', $bccTargets),
Query::limit(\count($bccTargets)),
]);
foreach ($bccTargets as $bccTarget) {
$bcc[] = ['email' => $bccTarget['identifier']];
}

View file

@ -5,16 +5,12 @@ namespace Appwrite\Utopia\Database\Validator\Queries;
class Messages extends Base
{
public const ALLOWED_ATTRIBUTES = [
'topics',
'users',
'targets',
'providerId',
'scheduledAt',
'deliveredAt',
'deliveredTo',
'deliveryErrors',
'deliveredTotal',
'status',
'description',
'data'
'providerType',
];
/**

View file

@ -8,6 +8,7 @@ class Targets extends Base
'userId',
'providerId',
'identifier',
'providerType',
];
/**

View file

@ -1860,8 +1860,8 @@ trait Base
}
}';
case self::$CREATE_FCM_PROVIDER:
return 'mutation createFcmProvider($providerId: String!, $name: String!, $serviceAccountJSON: Json) {
messagingCreateFcmProvider(providerId: $providerId, name: $name, serviceAccountJSON: $serviceAccountJSON) {
return 'mutation createFCMProvider($providerId: String!, $name: String!, $serviceAccountJSON: Json) {
messagingCreateFCMProvider(providerId: $providerId, name: $name, serviceAccountJSON: $serviceAccountJSON) {
_id
name
provider
@ -1870,8 +1870,8 @@ trait Base
}
}';
case self::$CREATE_APNS_PROVIDER:
return 'mutation createApnsProvider($providerId: String!, $name: String!, $authKey: String!, $authKeyId: String!, $teamId: String!, $bundleId: String!) {
messagingCreateApnsProvider(providerId: $providerId, name: $name, authKey: $authKey, authKeyId: $authKeyId, teamId: $teamId, bundleId: $bundleId) {
return 'mutation createAPNSProvider($providerId: String!, $name: String!, $authKey: String!, $authKeyId: String!, $teamId: String!, $bundleId: String!) {
messagingCreateAPNSProvider(providerId: $providerId, name: $name, authKey: $authKey, authKeyId: $authKeyId, teamId: $teamId, bundleId: $bundleId) {
_id
name
provider
@ -1974,8 +1974,8 @@ trait Base
}
}';
case self::$UPDATE_FCM_PROVIDER:
return 'mutation updateFcmProvider($providerId: String!, $name: String!, $serviceAccountJSON: Json) {
messagingUpdateFcmProvider(providerId: $providerId, name: $name, serviceAccountJSON: $serviceAccountJSON) {
return 'mutation updateFCMProvider($providerId: String!, $name: String!, $serviceAccountJSON: Json) {
messagingUpdateFCMProvider(providerId: $providerId, name: $name, serviceAccountJSON: $serviceAccountJSON) {
_id
name
provider
@ -1984,8 +1984,8 @@ trait Base
}
}';
case self::$UPDATE_APNS_PROVIDER:
return 'mutation updateApnsProvider($providerId: String!, $name: String!, $authKey: String!, $authKeyId: String!, $teamId: String!, $bundleId: String!) {
messagingUpdateApnsProvider(providerId: $providerId, name: $name, authKey: $authKey, authKeyId: $authKeyId, teamId: $teamId, bundleId: $bundleId) {
return 'mutation updateAPNSProvider($providerId: String!, $name: String!, $authKey: String!, $authKeyId: String!, $teamId: String!, $bundleId: String!) {
messagingUpdateAPNSProvider(providerId: $providerId, name: $name, authKey: $authKey, authKeyId: $authKeyId, teamId: $teamId, bundleId: $bundleId) {
_id
name
provider

View file

@ -70,7 +70,7 @@ class MessagingTest extends Scope
'apiSecret' => 'my-apisecret',
'from' => '+123456789',
],
'Fcm' => [
'FCM' => [
'providerId' => ID::unique(),
'name' => 'FCM1',
'serviceAccountJSON' => [
@ -80,7 +80,7 @@ class MessagingTest extends Scope
"private_key" => "test-private-key",
]
],
'Apns' => [
'APNS' => [
'providerId' => ID::unique(),
'name' => 'APNS1',
'authKey' => 'my-authkey',
@ -159,7 +159,7 @@ class MessagingTest extends Scope
'apiKey' => 'my-apikey',
'apiSecret' => 'my-apisecret',
],
'Fcm' => [
'FCM' => [
'providerId' => $providers[7]['_id'],
'name' => 'FCM2',
'serviceAccountJSON' => [
@ -169,7 +169,7 @@ class MessagingTest extends Scope
'private_key' => "test-private-key",
]
],
'Apns' => [
'APNS' => [
'providerId' => $providers[8]['_id'],
'name' => 'APNS2',
'authKey' => 'my-authkey',
@ -998,7 +998,7 @@ class MessagingTest extends Scope
$this->assertEquals(200, $provider['headers']['status-code']);
$providerId = $provider['body']['data']['messagingCreateFcmProvider']['_id'];
$providerId = $provider['body']['data']['messagingCreateFCMProvider']['_id'];
$query = $this->getQuery(self::$CREATE_TOPIC);
$graphQLPayload = [

View file

@ -2,10 +2,10 @@
namespace Tests\E2E\Services\Messaging;
use Appwrite\Enum\MessageStatus;
use Tests\E2E\Client;
use Utopia\App;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Query;
use Utopia\DSN\DSN;
trait MessagingBase
@ -418,6 +418,12 @@ trait MessagingBase
*/
public function testListSubscribers(array $data)
{
$subscriberId = $data['subscriberId'];
$targetId = $data['targetId'];
$userId = $data['userId'];
$providerType = $data['providerType'];
$identifier = $data['identifier'];
$response = $this->client->call(Client::METHOD_GET, '/messaging/topics/' . $data['topicId'] . '/subscribers', \array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
@ -426,11 +432,41 @@ trait MessagingBase
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals(1, $response['body']['total']);
$this->assertEquals($data['userId'], $response['body']['subscribers'][0]['target']['userId']);
$this->assertEquals($data['providerType'], $response['body']['subscribers'][0]['target']['providerType']);
$this->assertEquals($data['identifier'], $response['body']['subscribers'][0]['target']['identifier']);
$this->assertEquals($userId, $response['body']['subscribers'][0]['target']['userId']);
$this->assertEquals($providerType, $response['body']['subscribers'][0]['target']['providerType']);
$this->assertEquals($identifier, $response['body']['subscribers'][0]['target']['identifier']);
$this->assertEquals(\count($response['body']['subscribers']), $response['body']['total']);
$response = $this->client->call(Client::METHOD_GET, '/messaging/topics/' . $data['topicId'] . '/subscribers', \array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]), [
'search' => 'DOES_NOT_EXIST',
]);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals(0, $response['body']['total']);
$searches = [
$subscriberId,
$targetId,
$userId,
$providerType
];
foreach ($searches as $search) {
$response = $this->client->call(Client::METHOD_GET, '/messaging/topics/' . $data['topicId'] . '/subscribers', \array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]), [
'search' => $search,
]);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals(1, $response['body']['total']);
}
return $data;
}
@ -581,6 +617,47 @@ trait MessagingBase
$this->assertEquals(204, $response['headers']['status-code']);
}
public function testCreateDraftEmail()
{
// Create User
$response = $this->client->call(Client::METHOD_POST, '/users', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], [
'userId' => ID::unique(),
'email' => uniqid() . "@example.com",
'password' => 'password',
'name' => 'Messaging User',
]);
$this->assertEquals(201, $response['headers']['status-code'], "Error creating user: " . var_export($response['body'], true));
$user = $response['body'];
$this->assertEquals(1, \count($user['targets']));
$targetId = $user['targets'][0]['$id'];
// Create Email
$response = $this->client->call(Client::METHOD_POST, '/messaging/messages/email', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], [
'messageId' => ID::unique(),
'targets' => [$targetId],
'subject' => 'New blog post',
'content' => 'Check out the new blog post at http://localhost',
]);
$this->assertEquals(201, $response['headers']['status-code']);
$message = $response['body'];
$this->assertEquals(MessageStatus::DRAFT, $message['status']);
return $message;
}
public function testSendEmail()
{
if (empty(App::getEnv('_APP_MESSAGE_EMAIL_TEST_DSN'))) {
@ -604,10 +681,11 @@ trait MessagingBase
'x-appwrite-key' => $this->getProject()['apiKey'],
]), [
'providerId' => ID::unique(),
'name' => 'Mailgun-provider',
'name' => 'Sendgrid-provider',
'apiKey' => $apiKey,
'fromName' => $fromName,
'fromEmail' => $fromEmail
'fromEmail' => $fromEmail,
'enabled' => true,
]);
$this->assertEquals(201, $provider['headers']['status-code']);
@ -639,13 +717,17 @@ trait MessagingBase
$this->assertEquals(201, $user['headers']['status-code']);
// Get target
$target = $user['body']['targets'][0];
// Create Subscriber
$subscriber = $this->client->call(Client::METHOD_POST, '/messaging/topics/' . $topic['body']['$id'] . '/subscribers', \array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'subscriberId' => ID::unique(),
'targetId' => $user['body']['targets'][0]['$id'],
'targetId' => $target['$id'],
]);
$this->assertEquals(201, $subscriber['headers']['status-code']);
@ -759,7 +841,8 @@ trait MessagingBase
'name' => 'Msg91Sender',
'senderId' => $senderId,
'authKey' => $authKey,
'from' => $from
'from' => $from,
'enabled' => true,
]);
$this->assertEquals(201, $provider['headers']['status-code']);
@ -920,6 +1003,7 @@ trait MessagingBase
'providerId' => ID::unique(),
'name' => 'FCM-1',
'serviceAccountJSON' => $serviceAccountJSON,
'enabled' => true,
]);
$this->assertEquals(201, $provider['headers']['status-code']);
@ -1058,4 +1142,58 @@ trait MessagingBase
$this->assertEquals(1, $message['body']['deliveredTotal']);
$this->assertEquals(0, \count($message['body']['deliveryErrors']));
}
/**
* @depends testCreateDraftEmail
*/
public function testListTargets(array $message)
{
$response = $this->client->call(Client::METHOD_GET, '/messaging/messages/does_not_exist/targets', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
$this->assertEquals(404, $response['headers']['status-code']);
$response = $this->client->call(Client::METHOD_GET, '/messaging/messages/' . $message['$id'] . '/targets', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
$this->assertEquals(200, $response['headers']['status-code']);
$targetList = $response['body'];
$this->assertEquals(1, $targetList['total']);
$this->assertEquals(1, count($targetList['targets']));
$this->assertEquals($message['targets'][0], $targetList['targets'][0]['$id']);
// Test for empty targets
$response = $this->client->call(Client::METHOD_POST, '/messaging/messages/email', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], [
'messageId' => ID::unique(),
'subject' => 'New blog post',
'content' => 'Check out the new blog post at http://localhost',
]);
$this->assertEquals(201, $response['headers']['status-code']);
$message = $response['body'];
$response = $this->client->call(Client::METHOD_GET, '/messaging/messages/' . $message['$id'] . '/targets', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
$this->assertEquals(200, $response['headers']['status-code']);
$targetList = $response['body'];
$this->assertEquals(0, $targetList['total']);
$this->assertEquals(0, count($targetList['targets']));
}
}