From 780841853e82543611af579829e74c526f0af71c Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Sun, 21 Feb 2021 23:22:32 +0200 Subject: [PATCH] Fixed code to work with 0.7 changes --- app/controllers/api/health.php | 3 +- app/realtime.php | 96 ++++++++++++++++++++++++---------- docker-compose.yml | 3 ++ 3 files changed, 74 insertions(+), 28 deletions(-) diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 8d1b797b6..c8559278f 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -37,6 +37,7 @@ App::get('/v1/health/realtime') ->desc('Get Realtime') ->groups(['api', 'health']) ->label('scope', 'public') + ->inject('response') ->action(function ($response) { /** @var Utopia\Response $response */ $redis = new Redis(); @@ -44,7 +45,7 @@ App::get('/v1/health/realtime') $redis->publish('realtime', 'I\'m a live message'); $response->json(['status' => 'OK']); - }, ['response']); + }); App::get('/v1/health/db') ->desc('Get DB') diff --git a/app/realtime.php b/app/realtime.php index 9c3927cc8..2779d708f 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,14 +4,14 @@ require_once __DIR__.'/init.php'; require_once __DIR__.'/../vendor/autoload.php'; use Appwrite\Auth\Auth; -use Appwrite\Swoole\Request as SwooleRequest; +use Appwrite\Database\Document; use Swoole\WebSocket\Server; use Swoole\Http\Request; use Swoole\Process; use Swoole\WebSocket\Frame; use Utopia\App; use Utopia\CLI\Console; -use Utopia\Route; +use Utopia\Swoole\Request as SwooleRequest; /** * TODO List @@ -36,7 +36,7 @@ $server = new Server("0.0.0.0", 80); $subscriptions = []; $connections = []; -$server->on("workerStart", function ($server, $workerId) use (&$subscriptions) { +$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections) { Console::success('Worker '.++$workerId.' started succefully'); $attempts = 0; @@ -62,22 +62,37 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions) { Console::error('Pub/sub failed (worker: '.$workerId.')'); } - $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) { + $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) { $message = 'Message from worker #'.$workerId.'; '.$message; - - // foreach($connections as $fd) { - // if ($server->exist($fd) - // && $server->isEstablished($fd) - // ) { - // Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); - // $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, - // SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); - // } - // else { - // $server->close($fd); - // } - // } + // TODO get project and resource ID and itterate over the resource read(?) permissions and send a message to all listeners + + /** + * Supported Resources: + * - Collection + * - Document + * - Bucket + * - File + * - User? / Account? (no permissions) + * - Session? (no permissions) + * - Team? (no permissions) + * - Membership? (no permissions) + * - Function + * - Execution + */ + + foreach($connections as $fd => $connection) { + if ($server->exist($fd) + && $server->isEstablished($fd) + ) { + Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); + $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, + SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); + } + else { + $server->close($fd); + } + } }); } catch (\Throwable $th) { @@ -119,19 +134,23 @@ $server->on('open', function(Server $server, Request $request) use (&$connection return null; }); + App::setResource('project', function () { // TODO get project from query string + return new Document(); + }); + + App::setResource('user', function () { // TODO get user with JWT token + return new Document(); + }); + $channels = array_flip($request->getQuery('channels', [])); + $jwt = $request->getQuery('jwt', ''); $user = $app->getResource('user'); $project = $app->getResource('project'); - $roles = ['user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)]; + $roles = ['*', 'user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)]; /** @var Appwrite\Database\Document $user */ /** @var Appwrite\Database\Document $project */ - var_dump($project->getId()); - var_dump($project->getAttribute('name')); - var_dump($user->getId()); - var_dump($user->getAttribute('name')); - \array_map(function ($node) use (&$roles) { if (isset($node['teamId']) && isset($node['roles'])) { $roles[] = 'team:'.$node['teamId']; @@ -142,6 +161,20 @@ $server->on('open', function(Server $server, Request $request) use (&$connection } }, $user->getAttribute('memberships', [])); + /** + * Build Subscriptions Tree + * + * [PROJECT_ID] -> + * [ROLE_X] -> + * [CHANNEL_NAME_X] -> [CONNECTION_ID] + * [CHANNEL_NAME_Y] -> [CONNECTION_ID] + * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + * [ROLE_Y] -> + * [CHANNEL_NAME_X] -> [CONNECTION_ID] + * [CHANNEL_NAME_Y] -> [CONNECTION_ID] + * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + */ + if(!isset($subscriptions[$project->getId()])) { // Init Project $subscriptions[$project->getId()] = []; } @@ -161,6 +194,11 @@ $server->on('open', function(Server $server, Request $request) use (&$connection 'roles' => $roles, ]; + var_dump($project->getId()); + var_dump($project->getAttribute('name')); + var_dump($user->getId()); + var_dump($user->getAttribute('name')); + $server->push($connection, json_encode($subscriptions)); }); @@ -175,8 +213,6 @@ $server->on('message', function(Server $server, Frame $frame) { }); $server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) { - Console::error('Connection close: '.$fd); - $projectId = $connections[$fd]['projectId'] ?? ''; $roles = $connections[$fd]['roles'] ?? []; @@ -185,17 +221,23 @@ $server->on('close', function(Server $server, int $fd) use (&$connections, &$sub unset($subscriptions[$projectId][$role][$channel][$fd]); // Remove connection if(empty($subscriptions[$projectId][$role][$channel])) { - unset($subscriptions[$projectId][$role][$channel]); // Remove channel + unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections } } if(empty($subscriptions[$projectId][$role])) { - unset($subscriptions[$projectId][$role]); // Remove role + unset($subscriptions[$projectId][$role]); // Remove role when no channels } } + if(empty($subscriptions[$projectId])) { // Remove project when no roles + unset($subscriptions[$projectId]); + } + unset($connections[$fd]); + Console::info('Connection close: '.$fd); + var_dump($subscriptions); }); diff --git a/docker-compose.yml b/docker-compose.yml index 6fef7a4e2..9d9577a9f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -145,6 +145,9 @@ services: - traefik.http.routers.appwrite_realtime_wss.tls.certresolver=dns networks: - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src depends_on: - redis environment: