diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index e6ab3b40f..69ffdf4f2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -143,6 +143,7 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits $realtime ->setEvent($events->getParam('event')) + ->setProject($project->getId()) ->setPayload($response->getPayload()) ->trigger(); diff --git a/app/init.php b/app/init.php index a4d2914b0..98ff4c923 100644 --- a/app/init.php +++ b/app/init.php @@ -323,7 +323,7 @@ App::setResource('events', function($register) { }, ['register']); App::setResource('realtime', function($register) { - return new Realtime('', []); + return new Realtime('', '', []); }, ['register']); App::setResource('audits', function($register) { diff --git a/app/realtime.php b/app/realtime.php index 2779d708f..aae73af2b 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -1,18 +1,29 @@ set([ + 'worker_num' => 1 + ]); $subscriptions = []; $connections = []; -$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections) { - Console::success('Worker '.++$workerId.' started succefully'); +$register = new Registry(); + +$register->set('db', function () { // Register DB connection + $dbHost = App::getEnv('_APP_DB_HOST', ''); + $dbUser = App::getEnv('_APP_DB_USER', ''); + $dbPass = App::getEnv('_APP_DB_PASS', ''); + $dbScheme = App::getEnv('_APP_DB_SCHEMA', ''); + + $pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array( + PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4', + PDONative::ATTR_TIMEOUT => 3, // Seconds + PDONative::ATTR_PERSISTENT => true + )); + + // Connection settings + $pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays + $pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions + + return $pdo; +}); + +$register->set('cache', function () { // Register cache connection + $redis = new Redis(); + $redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); + $user = App::getEnv('_APP_REDIS_USER',''); + $pass = App::getEnv('_APP_REDIS_PASS',''); + $auth = []; + if(!empty($user)) { + $auth["user"] = $user; + } + if(!empty($pass)) { + $auth["pass"] = $pass; + } + if(!empty($auth)) { + $redis->auth($auth); + } + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + + return $redis; +}); + +$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections, &$register) { + Console::success('Worker ' . ++$workerId . ' started succefully'); $attempts = 0; $start = time(); - + while ($attempts < 300) { try { - if($attempts > 0) { - Console::error('Pub/sub connection lost (lasted '.(time() - $start).' seconds, worker: '.$workerId.'). - Attempting restart in 5 seconds (attempt #'.$attempts.')'); + if ($attempts > 0) { + Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . '). + Attempting restart in 5 seconds (attempt #' . $attempts . ')'); sleep(5); // 1 sec delay between connection attempts } - $redis = new Redis(); - $redis->connect('redis', 6379); - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + $redis = $register->get('cache'); - if($redis->ping(true)) { + if ($redis->ping(true)) { $attempts = 0; - Console::success('Pub/sub connection established (worker: '.$workerId.')'); - } - else { - Console::error('Pub/sub failed (worker: '.$workerId.')'); + Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); + } else { + Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) { - $message = 'Message from worker #'.$workerId.'; '.$message; - + $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$connections, &$subscriptions) { // TODO get project and resource ID and itterate over the resource read(?) permissions and send a message to all listeners /** @@ -80,23 +130,43 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, & * - Function * - Execution */ - - foreach($connections as $fd => $connection) { - if ($server->exist($fd) - && $server->isEstablished($fd) + $event = json_decode($payload); + + $receivers = []; + + foreach ($connections as $fd => $connection) { + if ($connection['projectId'] !== $event->project) { + continue; + } + + foreach ($connection['roles'] as $role) { + if (\array_key_exists($role, $subscriptions[$event->project])) { + foreach ($event->channels as $channel) { + if (\array_key_exists($channel, $subscriptions[$event->project][$role]) && \in_array($role, $event->permissions)) { + $receivers = array_merge($receivers, array_keys($subscriptions[$event->project][$role][$channel])); + break; + } + } + } + } + } + + $receivers = array_keys(array_flip($receivers)); + + foreach ($receivers as $receiver) { + if ($server->exist($receiver) + && $server->isEstablished($receiver) ) { - Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); - $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, + $server->push($receiver, json_encode($event->data), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); } else { - $server->close($fd); + $server->close($receiver); } } }); - } catch (\Throwable $th) { - Console::error('Pub/sub error: '.$th->getMessage()); + Console::error('Pub/sub error: ' . $th->getMessage()); $attempts++; continue; } @@ -119,7 +189,7 @@ $server->on("start", function (Server $server) { }); }); -$server->on('open', function(Server $server, Request $request) use (&$connections, &$subscriptions) { +$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) { Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})"); $app = new App(''); @@ -130,33 +200,74 @@ $server->on('open', function(Server $server, Request $request) use (&$connection return $request; }); - App::setResource('response', function () { - return null; - }); + App::setResource('consoleDB', function () use (&$register) { + $consoleDB = new Database(); + $consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis + $consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects + $consoleDB->setMocks(Config::getParam('collections', [])); - App::setResource('project', function () { // TODO get project from query string - return new Document(); - }); + return $consoleDB; + }, ['register']); - App::setResource('user', function () { // TODO get user with JWT token - return new Document(); - }); + App::setResource('project', function ($consoleDB, $request) { + /** @var Utopia\Swoole\Request $request */ + /** @var Appwrite\Database\Database $consoleDB */ + + Authorization::disable(); + + $project = $consoleDB->getDocument($request->getQuery('project')); + + Authorization::reset(); + + return $project; + }, ['consoleDB', 'request']); + + App::setResource('user', function ($project, $request, $projectDB) { + /** @var Utopia\Swoole\Request $request */ + /** @var Appwrite\Database\Document $project */ + /** @var Appwrite\Database\Database $projectDB */ + + Authorization::setDefaultStatus(true); + + Auth::setCookieName('a_session_' . $project->getId()); + + $session = Auth::decodeSession( + $request->getCookie( + Auth::$cookieName, // Get sessions + $request->getCookie(Auth::$cookieName . '_legacy', '') + ) + ); // Get fallback session from old clients (no SameSite support) + + Auth::$unique = $session['id']; + Auth::$secret = $session['secret']; + + $user = $projectDB->getDocument(Auth::$unique); + + if ( + empty($user->getId()) // Check a document has been found in the DB + || Database::SYSTEM_COLLECTION_USERS !== $user->getCollection() // Validate returned document is really a user document + || !Auth::tokenVerify($user->getAttribute('tokens', []), Auth::TOKEN_TYPE_LOGIN, Auth::$secret) + ) { // Validate user has valid login token + $user = new Document(['$id' => '', '$collection' => Database::SYSTEM_COLLECTION_USERS]); + } + + return $user; + }, ['project', 'request', 'projectDB']); $channels = array_flip($request->getQuery('channels', [])); - $jwt = $request->getQuery('jwt', ''); $user = $app->getResource('user'); $project = $app->getResource('project'); - $roles = ['*', 'user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)]; + $roles = ['*', 'user:' . $user->getId(), 'role:' . (($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)]; /** @var Appwrite\Database\Document $user */ /** @var Appwrite\Database\Document $project */ \array_map(function ($node) use (&$roles) { if (isset($node['teamId']) && isset($node['roles'])) { - $roles[] = 'team:'.$node['teamId']; + $roles[] = 'team:' . $node['teamId']; foreach ($node['roles'] as $nodeRole) { // Set all team roles - $roles[] = 'team:'.$node['teamId'].'/'.$nodeRole; + $roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole; } } }, $user->getAttribute('memberships', [])); @@ -175,15 +286,15 @@ $server->on('open', function(Server $server, Request $request) use (&$connection * [CHANNEL_NAME_Z] -> [CONNECTION_ID] */ - if(!isset($subscriptions[$project->getId()])) { // Init Project + if (!isset($subscriptions[$project->getId()])) { // Init Project $subscriptions[$project->getId()] = []; } foreach ($roles as $key => $role) { - if(!isset($subscriptions[$project->getId()][$role])) { // Add user first connection + if (!isset($subscriptions[$project->getId()][$role])) { // Add user first connection $subscriptions[$project->getId()][$role] = []; } - + foreach ($channels as $channel => $list) { $subscriptions[$project->getId()][$role][$channel][$connection] = true; } @@ -194,25 +305,18 @@ $server->on('open', function(Server $server, Request $request) use (&$connection 'roles' => $roles, ]; - var_dump($project->getId()); - var_dump($project->getAttribute('name')); - var_dump($user->getId()); - var_dump($user->getAttribute('name')); - $server->push($connection, json_encode($subscriptions)); }); -$server->on('message', function(Server $server, Frame $frame) { - if($frame->data === 'reload') { +$server->on('message', function (Server $server, Frame $frame) use (&$connections, &$subscriptions) { + if ($frame->data === 'reload') { $server->reload(); } - Console::info('Recieved message: '.$frame->data.' (user: '.$frame->fd.', worker: '.$server->getWorkerId().')'); - - $server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()])); + Console::info('Recieved message: ' . $frame->data . ' (user: ' . $frame->fd . ', worker: ' . $server->getWorkerId() . ')'); }); -$server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) { +$server->on('close', function (Server $server, int $fd) use (&$connections, &$subscriptions) { $projectId = $connections[$fd]['projectId'] ?? ''; $roles = $connections[$fd]['roles'] ?? []; @@ -220,25 +324,24 @@ $server->on('close', function(Server $server, int $fd) use (&$connections, &$sub foreach ($subscriptions[$projectId][$role] as $channel => $list) { unset($subscriptions[$projectId][$role][$channel][$fd]); // Remove connection - if(empty($subscriptions[$projectId][$role][$channel])) { + if (empty($subscriptions[$projectId][$role][$channel])) { unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections } } - if(empty($subscriptions[$projectId][$role])) { + if (empty($subscriptions[$projectId][$role])) { unset($subscriptions[$projectId][$role]); // Remove role when no channels } } - if(empty($subscriptions[$projectId])) { // Remove project when no roles + if (empty($subscriptions[$projectId])) { // Remove project when no roles unset($subscriptions[$projectId]); } unset($connections[$fd]); - Console::info('Connection close: '.$fd); + Console::info('Connection close: ' . $fd); - var_dump($subscriptions); }); $server->start(); \ No newline at end of file diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index e1f1f887f..93f2f3050 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -7,6 +7,11 @@ use Utopia\App; class Realtime { + /** + * @var string + */ + protected $project = ''; + /** * @var string */ @@ -26,15 +31,35 @@ class Realtime /** * Event constructor. * + * @param string $project * @param string $event * @param array $payload */ - public function __construct(string $event, array $payload) + public function __construct(string $project, string $event, array $payload) { + $this->project = $project; $this->event = $event; $this->payload = new Document($payload); } + /** + * @param string $project + * return $this + */ + public function setProject(string $project): self + { + $this->project = $project; + return $this; + } + + /** + * @return string + */ + public function getProject(): string + { + return $this->project; + } + /** * @param string $event * return $this @@ -115,11 +140,13 @@ class Realtime $redis = new \Redis(); $redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); $redis->publish('realtime', json_encode([ + 'project' => $this->project, 'channels' => $this->channels, + 'permissions' => $this->payload->getAttribute('$permissions.read'), 'data' => [ 'event' => $this->event, 'timestamp' => time(), - 'payload' => $this->payload + 'payload' => $this->payload->getArrayCopy() ] ]));