diff --git a/app/realtime.php b/app/realtime.php index 5b087aa96..5c8ed6dae 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,8 +5,9 @@ use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Database; use Appwrite\Event\Event; +use Appwrite\Event\Realtime as RealtimeEvent; +use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; -use Appwrite\Realtime\Parser; use Swoole\Http\Request as SwooleRequest; use Swoole\Http\Response as SwooleResponse; use Swoole\Process; @@ -44,6 +45,8 @@ $stats->create(); $server = new Server($adapter); +$realtime = new Realtime(); + $server->onStart(function (SwooleServer $server) use ($stats) { Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); @@ -83,7 +86,7 @@ $server->onStart(function (SwooleServer $server) use ($stats) { }); }); -$server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use ($server, $register, $stats, &$subscriptions, &$connections) { +$server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use ($server, $register, $stats, $realtime) { Console::success('Worker ' . $workerId . ' started succefully'); $attempts = 0; @@ -93,22 +96,22 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use /** * Sending current connections to project channels on the console project every 5 seconds. */ - Timer::tick(5000, function () use ($server, $stats, &$subscriptions) { - if ( - array_key_exists('console', $subscriptions) - && array_key_exists('role:member', $subscriptions['console']) - && array_key_exists('project', $subscriptions['console']['role:member']) - ) { + Timer::tick(5000, function () use ($server, $stats, $realtime) { + if ($realtime->hasSubscriber('console', 'role:member', 'project')) { $payload = []; foreach ($stats as $projectId => $value) { $payload[$projectId] = $value['connectionsTotal']; } - $server->send(array_keys($subscriptions['console']['role:member']['project']), json_encode([ + + $event = [ 'event' => 'stats.connections', 'channels' => ['project'], + 'permissions' => ['role:member'], 'timestamp' => time(), 'payload' => $payload - ])); + ]; + + $server->send($realtime->getReceivers($event), json_encode($event)); } }); @@ -132,43 +135,38 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, &$connections, &$subscriptions) { + $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { - $project = $event['project']; + $projectId = $event['project']; $userId = $event['userId']; - if (array_key_exists($project, $subscriptions) && array_key_exists('user:' . $userId, $subscriptions[$project])) { - $connection = array_key_first(reset($subscriptions[$project]['user:' . $userId])); + if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) { + $connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId])); } else { return; } - /** - * This is redundant soon and will be gone with merging the usage branch. - */ $db = $register->get('dbPool')->get(); $cache = $register->get('redisPool')->get(); $projectDB = new Database(); $projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); - $projectDB->setNamespace('app_' . $project); + $projectDB->setNamespace('app_' . $projectId); $projectDB->setMocks(Config::getParam('collections', [])); $user = $projectDB->getDocument($userId); - Parser::setUser($user); - $roles = Auth::getRoles($user); - Parser::subscribe($project, $connection, $roles, $subscriptions, $connections, $connections[$connection]['channels']); + $realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']); $register->get('dbPool')->put($db); $register->get('redisPool')->put($cache); } - $receivers = Parser::identifyReceivers($event, $subscriptions); + $receivers = $realtime->getReceivers($event); // Temporarily print debug logs by default for Alpha testing. // if (App::isDevelopment() && !empty($receivers)) { @@ -200,7 +198,7 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use Console::error('Failed to restart pub/sub...'); }); -$server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) use ($server, $register, $stats, &$subscriptions, &$connections) { +$server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) use ($server, $register, $stats, &$realtime) { $app = new App('UTC'); $connection = $request->fd; $request = new Request($request); @@ -276,10 +274,12 @@ $server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) us throw new Exception($originValidator->getDescription(), 1008); } - Parser::setUser($user); + $roles = [ + 'role:' . (($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER), + ...Auth::getRoles($user) + ]; - $roles = Parser::getRoles(); - $channels = Parser::parseChannels($request->getQuery('channels', [])); + $channels = RealtimeEvent::convertChannels($request->getQuery('channels', []), $user); /** * Channels Check @@ -288,7 +288,7 @@ $server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) us throw new Exception('Missing channels', 1008); } - Parser::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels); + $realtime->subscribe($project->getId(), $connection, $roles, $channels); $server->send([$connection], json_encode($channels)); @@ -322,11 +322,12 @@ $server->onMessage(function (SwooleServer $swooleServer, Frame $frame) use ($ser $server->close($connection, 1003); }); -$server->onClose(function (SwooleServer $server, int $connection) use (&$connections, &$subscriptions, $stats) { - if (array_key_exists($connection, $connections)) { - $stats->decr($connections[$connection]['projectId'], 'connectionsTotal'); +$server->onClose(function (SwooleServer $server, int $connection) use ($realtime, $stats) { + if (array_key_exists($connection, $realtime->connections)) { + $stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal'); } - Parser::unsubscribe($connection, $subscriptions, $connections); + $realtime->unsubscribe($connection); + Console::info('Connection close: ' . $connection); }); diff --git a/src/Appwrite/Auth/Auth.php b/src/Appwrite/Auth/Auth.php index ffdf1e631..8dfe9cbd7 100644 --- a/src/Appwrite/Auth/Auth.php +++ b/src/Appwrite/Auth/Auth.php @@ -280,10 +280,10 @@ class Auth */ public static function getRoles(Document $user): array { - $roles = []; - if ($user->getId()) { $roles[] = 'user:'.$user->getId(); + } else { + return []; } foreach ($user->getAttribute('memberships', []) as $node) { diff --git a/src/Appwrite/Database/Pool.php b/src/Appwrite/Database/Pool.php deleted file mode 100644 index ee8902d3d..000000000 --- a/src/Appwrite/Database/Pool.php +++ /dev/null @@ -1,21 +0,0 @@ -available = false; - while (!$this->pool->isEmpty()) { - $this->pool->pop(); - } - } -} diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 594206432..b11325150 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -121,6 +121,42 @@ class Realtime return $this->payload; } + /** + * Converts the channels from the Query Params into an array. + * Also renames the account channel to account.USER_ID and removes all illegal account channel variations. + * @param array $channels + * @param Document $user + * @return array + */ + public static function convertChannels(array $channels, Document $user): array + { + $channels = array_flip($channels); + + foreach ($channels as $key => $value) { + switch (true) { + case strpos($key, 'account.') === 0: + unset($channels[$key]); + break; + + case $key === 'account': + if (!empty($user->getId())) { + $channels['account.' . $user->getId()] = $value; + } + unset($channels['account']); + break; + } + } + + if (\array_key_exists('account', $channels)) { + if ($user->getId()) { + $channels['account.' . $user->getId()] = $channels['account']; + } + unset($channels['account']); + } + + return $channels; + } + /** * Populate channels array based on the event name and payload. * diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php new file mode 100644 index 000000000..055e0e49d --- /dev/null +++ b/src/Appwrite/Messaging/Adapter.php @@ -0,0 +1,10 @@ + + * 'projectId' -> [PROJECT_ID] + * 'roles' -> [ROLE_x, ROLE_Y] + * 'channels' -> [CHANNEL_NAME_X, CHANNEL_NAME_Y, CHANNEL_NAME_Z] + */ + public array $connections = []; + + /** + * Subscription Tree + * + * [PROJECT_ID] -> + * [ROLE_X] -> + * [CHANNEL_NAME_X] -> [CONNECTION_ID] + * [CHANNEL_NAME_Y] -> [CONNECTION_ID] + * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + * [ROLE_Y] -> + * [CHANNEL_NAME_X] -> [CONNECTION_ID] + * [CHANNEL_NAME_Y] -> [CONNECTION_ID] + * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + */ + public array $subscriptions = []; + + /** + * Adds a subscribtion. + * @param string $projectId Project ID. + * @param mixed $connection Unique Identifier - Connection ID. + * @param array $roles Roles of the Subscription. + * @param array $channels Subscribed Channels. + * @return void + */ + public function subscribe(string $projectId, mixed $connection, array $roles, array $channels): void + { + if (!isset($this->subscriptions[$projectId])) { // Init Project + $this->subscriptions[$projectId] = []; + } + + foreach ($roles as $role) { + if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection + $this->subscriptions[$projectId][$role] = []; + } + + foreach ($channels as $channel => $list) { + $this->subscriptions[$projectId][$role][$channel][$connection] = true; + } + } + + $this->connections[$connection] = [ + 'projectId' => $projectId, + 'roles' => $roles, + 'channels' => $channels + ]; + } + + /** + * Removes Subscription. + * + * @param mixed $connection + * @return void + */ + public function unsubscribe(mixed $connection): void + { + $projectId = $this->connections[$connection]['projectId'] ?? ''; + $roles = $this->connections[$connection]['roles'] ?? []; + + foreach ($roles as $role) { + foreach ($this->subscriptions[$projectId][$role] as $channel => $list) { + unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection + + if (empty($this->subscriptions[$projectId][$role][$channel])) { + unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections + } + } + + if (empty($this->subscriptions[$projectId][$role])) { + unset($this->subscriptions[$projectId][$role]); // Remove role when no channels + } + } + + if (empty($this->subscriptions[$projectId])) { // Remove project when no roles + unset($this->subscriptions[$projectId]); + } + + unset($this->connections[$connection]); + } + + /** + * Checks if Channel has a subscriber. + * @param string $projectId + * @param string $role + * @param string $channel + * @return bool + */ + public function hasSubscriber(string $projectId, string $role, string $channel = ''): bool + { + if (empty($channel)) { + return array_key_exists($projectId, $this->subscriptions) + && array_key_exists($role, $this->subscriptions[$projectId]); + } + + return array_key_exists($projectId, $this->subscriptions) + && array_key_exists($role, $this->subscriptions[$projectId]) + && array_key_exists($channel, $this->subscriptions[$projectId][$role]); + } + + /** + * Sends an event to the Realtime Server. + * @param string $projectId + * @param string $event + * @param array $payload + * @return void + */ + public function send(string $projectId, string $event, array $payload): void + { + $realtime = new EventRealtime($projectId, $event, $payload); + $realtime->trigger(); + } + + /** + * Identifies the receivers of all subscriptions, based on the permissions and event. + * + * Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels: + * - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions + * - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions + * - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions + * - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions + * - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions + * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions + * + * @param array $event + */ + public function getReceivers(array $event) + { + $receivers = []; + if (isset($this->subscriptions[$event['project']])) { + foreach ($this->subscriptions[$event['project']] as $role => $subscription) { + foreach ($event['data']['channels'] as $channel) { + if ( + \array_key_exists($channel, $this->subscriptions[$event['project']][$role]) + && (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions'])) + ) { + foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $ids) { + $receivers[$ids] = 0; + } + break; + } + } + } + } + + return array_keys($receivers); + } +} diff --git a/src/Appwrite/Realtime/Parser.php b/src/Appwrite/Realtime/Parser.php deleted file mode 100644 index 8a5fd1bfc..000000000 --- a/src/Appwrite/Realtime/Parser.php +++ /dev/null @@ -1,202 +0,0 @@ -isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)]; - if (!(self::$user->isEmpty())) { - $roles[] = 'user:' . self::$user->getId(); - } - foreach (self::$user->getAttribute('memberships', []) as $node) { - if (isset($node['teamId']) && isset($node['roles'])) { - $roles[] = 'team:' . $node['teamId']; - - foreach ($node['roles'] as $nodeRole) { // Set all team roles - $roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole; - } - } - } - return $roles; - } - - /** - * Converts the channels from the Query Params into an array. - * Also renames the account channel to account.USER_ID and removes all illegal account channel variations. - * - * @param array $channels - */ - static function parseChannels(array $channels) - { - $channels = array_flip($channels); - - foreach ($channels as $key => $value) { - switch (true) { - case strpos($key, 'account.') === 0: - unset($channels[$key]); - break; - - case $key === 'account': - if (!empty(self::$user->getId())) { - $channels['account.' . self::$user->getId()] = $value; - } - unset($channels['account']); - break; - } - } - - if (\array_key_exists('account', $channels)) { - if (self::$user->getId()) { - $channels['account.' . self::$user->getId()] = $channels['account']; - } - unset($channels['account']); - } - - return $channels; - } - - /** - * Identifies the receivers of all subscriptions, based on the permissions and event. - * - * Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels: - * - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions - * - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions - * - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions - * - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions - * - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions - * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions - * - * @param array $event - * @param array $connections - * @param array $subscriptions - */ - static function identifyReceivers(array &$event, array &$subscriptions) - { - $receivers = []; - if (isset($subscriptions[$event['project']])) { - foreach ($subscriptions[$event['project']] as $role => $subscription) { - foreach ($event['data']['channels'] as $channel) { - if ( - \array_key_exists($channel, $subscriptions[$event['project']][$role]) - && (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions'])) - ) { - foreach (array_keys($subscriptions[$event['project']][$role][$channel]) as $ids) { - $receivers[$ids] = 0; - } - break; - } - } - } - } - - return array_keys($receivers); - } - - /** - * Adds Subscription. - * - * @param string $projectId - * @param mixed $connection - * @param array $subscriptions - * @param array $roles - * @param array $channels - */ - static function subscribe($projectId, $connection, $roles, &$subscriptions, &$connections, &$channels) - { - /** - * Build Subscriptions Tree - * - * [PROJECT_ID] -> - * [ROLE_X] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] - * [ROLE_Y] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] - */ - - if (!isset($subscriptions[$projectId])) { // Init Project - $subscriptions[$projectId] = []; - } - - foreach ($roles as $role) { - if (!isset($subscriptions[$projectId][$role])) { // Add user first connection - $subscriptions[$projectId][$role] = []; - } - - foreach ($channels as $channel => $list) { - $subscriptions[$projectId][$role][$channel][$connection] = true; - } - } - - $connections[$connection] = [ - 'projectId' => $projectId, - 'roles' => $roles, - 'channels' => $channels - ]; - } - - /** - * Remove Subscription. - * - * @param mixed $connection - * @param array $subscriptions - * @param array $connections - */ - static function unsubscribe($connection, &$subscriptions, &$connections) - { - $projectId = $connections[$connection]['projectId'] ?? ''; - $roles = $connections[$connection]['roles'] ?? []; - - foreach ($roles as $role) { - foreach ($subscriptions[$projectId][$role] as $channel => $list) { - unset($subscriptions[$projectId][$role][$channel][$connection]); // Remove connection - - if (empty($subscriptions[$projectId][$role][$channel])) { - unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections - } - } - - if (empty($subscriptions[$projectId][$role])) { - unset($subscriptions[$projectId][$role]); // Remove role when no channels - } - } - - if (empty($subscriptions[$projectId])) { // Remove project when no roles - unset($subscriptions[$projectId]); - } - - unset($connections[$connection]); - } -}