diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 26948cef3..d275047c2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -1,7 +1,9 @@ getParam('event'))) { @@ -198,12 +198,20 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits ->trigger(); if ($project->getId() !== 'console') { - $realtime - ->setEvent($events->getParam('event')) - ->setUserId($events->getParam('userId')) - ->setProject($project->getId()) - ->setPayload($response->getPayload()) - ->trigger(); + $payload = new Document($response->getPayload()); + $target = Realtime::fromPayload($events->getParam('event'), $payload); + + Realtime::send( + $project->getId(), + $response->getPayload(), + $events->getParam('event'), + $target['channels'], + $target['permissions'], + [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $events->getParam('userId') + ] + ); } } @@ -228,4 +236,4 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits ; } -}, ['utopia', 'request', 'response', 'project', 'events', 'audits', 'usage', 'deletes', 'realtime', 'mode'], 'api'); +}, ['utopia', 'request', 'response', 'project', 'events', 'audits', 'usage', 'deletes', 'mode'], 'api'); diff --git a/app/init.php b/app/init.php index 260682034..1bf450808 100644 --- a/app/init.php +++ b/app/init.php @@ -341,10 +341,6 @@ App::setResource('events', function($register) { return new Event('', ''); }, ['register']); -App::setResource('realtime', function($register) { - return new Realtime('', '', []); -}, ['register']); - App::setResource('audits', function($register) { return new Event(Event::AUDITS_QUEUE_NAME, Event::AUDITS_CLASS_NAME); }, ['register']); diff --git a/app/realtime.php b/app/realtime.php index 3f0fed508..31f1c2861 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,7 +5,6 @@ use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Database; use Appwrite\Event\Event; -use Appwrite\Event\Realtime as RealtimeEvent; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; use Swoole\Http\Request as SwooleRequest; @@ -267,7 +266,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, ...Auth::getRoles($user) ]; - $channels = RealtimeEvent::convertChannels($request->getQuery('channels', []), $user); + $channels = Realtime::convertChannels($request->getQuery('channels', []), $user); /** * Channels Check diff --git a/app/workers/functions.php b/app/workers/functions.php index 1910fdcac..f77c1728f 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -6,7 +6,7 @@ use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; -use Appwrite\Event\Realtime; +use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Resque\Worker; use Appwrite\Utopia\Response\Model\Execution; use Cron\CronExpression; @@ -494,9 +494,15 @@ class FunctionsV1 extends Worker $executionUpdate->trigger(); - $realtimeUpdate = new Realtime($projectId, 'functions.executions.update', $execution->getArrayCopy()); + $target = Realtime::fromPayload('functions.executions.update', $execution); - $realtimeUpdate->trigger(); + Realtime::send( + $projectId, + $execution->getArrayCopy(), + 'functions.executions.update', + $target['channels'], + $target['permissions'] + ); $usage = new Event('v1-usage', 'UsageV1'); diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php deleted file mode 100644 index b11325150..000000000 --- a/src/Appwrite/Event/Realtime.php +++ /dev/null @@ -1,264 +0,0 @@ -project = $project; - $this->event = $event; - $this->payload = new Document($payload); - } - - /** - * @param string $project - * return $this - */ - public function setProject(string $project): self - { - $this->project = $project; - return $this; - } - - /** - * @param string $userId - * return $this - */ - public function setUserId(string $userId): self - { - $this->userId = $userId; - return $this; - } - - /** - * @return string - */ - public function getProject(): string - { - return $this->project; - } - - /** - * @param string $event - * return $this - */ - public function setEvent(string $event): self - { - $this->event = $event; - return $this; - } - - /** - * @return string - */ - public function getEvent(): string - { - return $this->event; - } - - /** - * @param array $payload - * @return $this - */ - public function setPayload(array $payload): self - { - $this->payload = new Document($payload); - return $this; - } - - /** - * @return Document - */ - public function getPayload(): Document - { - return $this->payload; - } - - /** - * Converts the channels from the Query Params into an array. - * Also renames the account channel to account.USER_ID and removes all illegal account channel variations. - * @param array $channels - * @param Document $user - * @return array - */ - public static function convertChannels(array $channels, Document $user): array - { - $channels = array_flip($channels); - - foreach ($channels as $key => $value) { - switch (true) { - case strpos($key, 'account.') === 0: - unset($channels[$key]); - break; - - case $key === 'account': - if (!empty($user->getId())) { - $channels['account.' . $user->getId()] = $value; - } - unset($channels['account']); - break; - } - } - - if (\array_key_exists('account', $channels)) { - if ($user->getId()) { - $channels['account.' . $user->getId()] = $channels['account']; - } - unset($channels['account']); - } - - return $channels; - } - - /** - * Populate channels array based on the event name and payload. - * - * @return void - */ - private function prepareChannels(): void - { - switch (true) { - case strpos($this->event, 'account.recovery.') === 0: - case strpos($this->event, 'account.sessions.') === 0: - case strpos($this->event, 'account.verification.') === 0: - $this->channels[] = 'account.' . $this->payload->getAttribute('userId'); - $this->permissions = ['user:' . $this->payload->getAttribute('userId')]; - - break; - case strpos($this->event, 'account.') === 0: - $this->channels[] = 'account.' . $this->payload->getId(); - $this->permissions = ['user:' . $this->payload->getId()]; - - break; - case strpos($this->event, 'teams.memberships') === 0: - $this->permissionsChanged = in_array($this->event, ['teams.memberships.update', 'teams.memberships.delete', 'teams.memberships.update.status']); - $this->channels[] = 'memberships'; - $this->channels[] = 'memberships.' . $this->payload->getId(); - $this->permissions = ['team:' . $this->payload->getAttribute('teamId')]; - - break; - case strpos($this->event, 'teams.') === 0: - $this->permissionsChanged = $this->event === 'teams.create'; - $this->channels[] = 'teams'; - $this->channels[] = 'teams.' . $this->payload->getId(); - $this->permissions = ['team:' . $this->payload->getId()]; - - break; - case strpos($this->event, 'database.collections.') === 0: - $this->channels[] = 'collections'; - $this->channels[] = 'collections.' . $this->payload->getId(); - $this->permissions = $this->payload->getAttribute('$permissions.read'); - - break; - case strpos($this->event, 'database.documents.') === 0: - $this->channels[] = 'documents'; - $this->channels[] = 'collections.' . $this->payload->getAttribute('$collection') . '.documents'; - $this->channels[] = 'documents.' . $this->payload->getId(); - $this->permissions = $this->payload->getAttribute('$permissions.read'); - - break; - case strpos($this->event, 'storage.') === 0: - $this->channels[] = 'files'; - $this->channels[] = 'files.' . $this->payload->getId(); - $this->permissions = $this->payload->getAttribute('$permissions.read'); - - break; - case strpos($this->event, 'functions.executions.') === 0: - if (!empty($this->payload->getAttribute('$permissions.read'))) { - $this->channels[] = 'executions'; - $this->channels[] = 'executions.' . $this->payload->getId(); - $this->channels[] = 'functions.' . $this->payload->getAttribute('functionId'); - $this->permissions = $this->payload->getAttribute('$permissions.read'); - } - break; - } - } - - /** - * Execute Event. - * - * @return void - */ - public function trigger(): void - { - $this->prepareChannels(); - if (empty($this->channels)) return; - - $redis = new \Redis(); - $redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); - $redis->publish('realtime', json_encode([ - 'project' => $this->project, - 'permissions' => $this->permissions, - 'permissionsChanged' => $this->permissionsChanged, - 'userId' => $this->userId, - 'data' => [ - 'event' => $this->event, - 'channels' => $this->channels, - 'timestamp' => time(), - 'payload' => $this->payload->getArrayCopy() - ] - ])); - - $this->reset(); - } - - /** - * Resets this event and unpopulates all data. - * - * @return $this - */ - public function reset(): self - { - $this->event = ''; - $this->payload = $this->channels = []; - - return $this; - } -} diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php index 055e0e49d..d788e34d1 100644 --- a/src/Appwrite/Messaging/Adapter.php +++ b/src/Appwrite/Messaging/Adapter.php @@ -6,5 +6,5 @@ abstract class Adapter { public abstract function subscribe(string $project, mixed $identifier, array $roles, array $channels): void; public abstract function unsubscribe(mixed $identifier): void; - public abstract function send(string $projectId, string $event, array $payload): void; + public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $permissions, array $options): void; } diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index e97710f9a..d513cf22b 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -2,8 +2,9 @@ namespace Appwrite\Messaging\Adapter; -use Appwrite\Event\Realtime as EventRealtime; +use Appwrite\Database\Document; use Appwrite\Messaging\Adapter; +use Utopia\App; class Realtime extends Adapter { @@ -116,15 +117,35 @@ class Realtime extends Adapter /** * Sends an event to the Realtime Server. - * @param string $projectId - * @param string $event + * @param string $project * @param array $payload + * @param string $event + * @param array $channels + * @param array $permissions + * @param array $options * @return void */ - public function send(string $projectId, string $event, array $payload): void + public static function send(string $project, array $payload, string $event, array $channels, array $permissions, array $options = []): void { - $realtime = new EventRealtime($projectId, $event, $payload); - $realtime->trigger(); + if (empty($channels) || empty($permissions) || empty($project)) return; + + $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; + $userId = array_key_exists('userId', $options) ? $options['userId'] : null; + + $redis = new \Redis(); + $redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); + $redis->publish('realtime', json_encode([ + 'project' => $project, + 'permissions' => $permissions, + 'permissionsChanged' => $permissionsChanged, + 'userId' => $userId, + 'data' => [ + 'event' => $event, + 'channels' => $channels, + 'timestamp' => time(), + 'payload' => $payload + ] + ])); } /** @@ -161,4 +182,114 @@ class Realtime extends Adapter return array_keys($receivers); } + + /** + * Converts the channels from the Query Params into an array. + * Also renames the account channel to account.USER_ID and removes all illegal account channel variations. + * @param array $channels + * @param Document $user + * @return array + */ + public static function convertChannels(array $channels, Document $user): array + { + $channels = array_flip($channels); + + foreach ($channels as $key => $value) { + switch (true) { + case strpos($key, 'account.') === 0: + unset($channels[$key]); + break; + + case $key === 'account': + if (!empty($user->getId())) { + $channels['account.' . $user->getId()] = $value; + } + unset($channels['account']); + break; + } + } + + if (\array_key_exists('account', $channels)) { + if ($user->getId()) { + $channels['account.' . $user->getId()] = $channels['account']; + } + unset($channels['account']); + } + + return $channels; + } + + /** + * Create channels array based on the event name and payload. + * + * @return void + */ + public static function fromPayload(string $event, Document $payload): array + { + $channels = []; + $permissions = []; + $permissionsChanged = false; + + switch (true) { + case strpos($event, 'account.recovery.') === 0: + case strpos($event, 'account.sessions.') === 0: + case strpos($event, 'account.verification.') === 0: + $channels[] = 'account.' . $payload->getAttribute('userId'); + $permissions = ['user:' . $payload->getAttribute('userId')]; + + break; + case strpos($event, 'account.') === 0: + $channels[] = 'account.' . $payload->getId(); + $permissions = ['user:' . $payload->getId()]; + + break; + case strpos($event, 'teams.memberships') === 0: + $permissionsChanged = in_array($event, ['teams.memberships.update', 'teams.memberships.delete', 'teams.memberships.update.status']); + $channels[] = 'memberships'; + $channels[] = 'memberships.' . $payload->getId(); + $permissions = ['team:' . $payload->getAttribute('teamId')]; + + break; + case strpos($event, 'teams.') === 0: + $permissionsChanged = $event === 'teams.create'; + $channels[] = 'teams'; + $channels[] = 'teams.' . $payload->getId(); + $permissions = ['team:' . $payload->getId()]; + + break; + case strpos($event, 'database.collections.') === 0: + $channels[] = 'collections'; + $channels[] = 'collections.' . $payload->getId(); + $permissions = $payload->getAttribute('$permissions.read'); + + break; + case strpos($event, 'database.documents.') === 0: + $channels[] = 'documents'; + $channels[] = 'collections.' . $payload->getAttribute('$collection') . '.documents'; + $channels[] = 'documents.' . $payload->getId(); + $permissions = $payload->getAttribute('$permissions.read'); + + break; + case strpos($event, 'storage.') === 0: + $channels[] = 'files'; + $channels[] = 'files.' . $payload->getId(); + $permissions = $payload->getAttribute('$permissions.read'); + + break; + case strpos($event, 'functions.executions.') === 0: + if (!empty($payload->getAttribute('$permissions.read'))) { + $channels[] = 'executions'; + $channels[] = 'executions.' . $payload->getId(); + $channels[] = 'functions.' . $payload->getAttribute('functionId'); + $permissions = $payload->getAttribute('$permissions.read'); + } + break; + } + + return [ + 'channels' => $channels, + 'permissions' => $permissions, + 'permissionsChanged' => $permissionsChanged + ]; + } } diff --git a/tests/unit/Event/RealtimeTest.php b/tests/unit/Event/RealtimeTest.php deleted file mode 100644 index e4aef27ed..000000000 --- a/tests/unit/Event/RealtimeTest.php +++ /dev/null @@ -1,73 +0,0 @@ - '' - ]); - $channels = [ - 0 => 'files', - 1 => 'documents', - 2 => 'documents.789', - 3 => 'account', - 4 => 'account.456' - ]; - - $channels = Realtime::convertChannels($channels, $user); - $this->assertCount(3, $channels); - $this->assertArrayHasKey('files', $channels); - $this->assertArrayHasKey('documents', $channels); - $this->assertArrayHasKey('documents.789', $channels); - $this->assertArrayNotHasKey('account', $channels); - $this->assertArrayNotHasKey('account.456', $channels); - } - - public function testConvertChannelsUser() - { - $user = new Document([ - '$id' => '123', - 'memberships' => [ - [ - 'teamId' => 'abc', - 'roles' => [ - 'administrator', - 'moderator' - ] - ], - [ - 'teamId' => 'def', - 'roles' => [ - 'guest' - ] - ] - ] - ]); - $channels = [ - 0 => 'files', - 1 => 'documents', - 2 => 'documents.789', - 3 => 'account', - 4 => 'account.456' - ]; - - $channels = Realtime::convertChannels($channels, $user); - - $this->assertCount(4, $channels); - $this->assertArrayHasKey('files', $channels); - $this->assertArrayHasKey('documents', $channels); - $this->assertArrayHasKey('documents.789', $channels); - $this->assertArrayHasKey('account.123', $channels); - $this->assertArrayNotHasKey('account', $channels); - $this->assertArrayNotHasKey('account.456', $channels); - } -} diff --git a/tests/unit/Messaging/MessagingChannelsTest.php b/tests/unit/Messaging/MessagingChannelsTest.php index 4159def55..23ad320ed 100644 --- a/tests/unit/Messaging/MessagingChannelsTest.php +++ b/tests/unit/Messaging/MessagingChannelsTest.php @@ -4,8 +4,6 @@ namespace Appwrite\Tests; use Appwrite\Auth\Auth; use Appwrite\Database\Document; -use Appwrite\Event\Realtime as EventRealtime; -use Appwrite\Messaging; use Appwrite\Messaging\Adapter\Realtime; use PHPUnit\Framework\TestCase; @@ -67,7 +65,7 @@ class MessagingChannelsTest extends TestCase ...Auth::getRoles($user) ]; - $parsedChannels = EventRealtime::convertChannels([0 => $channel], $user); + $parsedChannels = Realtime::convertChannels([0 => $channel], $user); $this->realtime->subscribe( '1', @@ -94,7 +92,7 @@ class MessagingChannelsTest extends TestCase ...Auth::getRoles($user) ]; - $parsedChannels = EventRealtime::convertChannels([0 => $channel], $user); + $parsedChannels = Realtime::convertChannels([0 => $channel], $user); $this->realtime->subscribe( '1', diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index 6cd6a6967..d0d0aab78 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -2,6 +2,7 @@ namespace Appwrite\Tests; +use Appwrite\Database\Document; use Appwrite\Messaging\Adapter\Realtime; use PHPUnit\Framework\TestCase; @@ -132,4 +133,66 @@ class MessagingTest extends TestCase $this->assertEmpty($realtime->connections); $this->assertEmpty($realtime->subscriptions); } + + public function testConvertChannelsGuest() + { + $user = new Document([ + '$id' => '' + ]); + + $channels = [ + 0 => 'files', + 1 => 'documents', + 2 => 'documents.789', + 3 => 'account', + 4 => 'account.456' + ]; + + $channels = Realtime::convertChannels($channels, $user); + $this->assertCount(3, $channels); + $this->assertArrayHasKey('files', $channels); + $this->assertArrayHasKey('documents', $channels); + $this->assertArrayHasKey('documents.789', $channels); + $this->assertArrayNotHasKey('account', $channels); + $this->assertArrayNotHasKey('account.456', $channels); + } + + public function testConvertChannelsUser() + { + $user = new Document([ + '$id' => '123', + 'memberships' => [ + [ + 'teamId' => 'abc', + 'roles' => [ + 'administrator', + 'moderator' + ] + ], + [ + 'teamId' => 'def', + 'roles' => [ + 'guest' + ] + ] + ] + ]); + $channels = [ + 0 => 'files', + 1 => 'documents', + 2 => 'documents.789', + 3 => 'account', + 4 => 'account.456' + ]; + + $channels = Realtime::convertChannels($channels, $user); + + $this->assertCount(4, $channels); + $this->assertArrayHasKey('files', $channels); + $this->assertArrayHasKey('documents', $channels); + $this->assertArrayHasKey('documents.789', $channels); + $this->assertArrayHasKey('account.123', $channels); + $this->assertArrayNotHasKey('account', $channels); + $this->assertArrayNotHasKey('account.456', $channels); + } }