From c08b0a5d2bac2bf83caecc3d0f12f143271613ef Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Fri, 11 Jun 2021 15:30:33 +0200 Subject: [PATCH 01/15] feat(realtime): collection for concurrent connections --- app/config/collections.php | 33 ++++++++++++++++++++++++++++++ src/Appwrite/Database/Database.php | 3 +++ 2 files changed, 36 insertions(+) diff --git a/app/config/collections.php b/app/config/collections.php index 6789237c9..d1d623079 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -1698,6 +1698,39 @@ $collections = [ ], ], ], + Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS => [ + '$collection' => Database::SYSTEM_COLLECTION_COLLECTIONS, + '$id' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$permissions' => ['read' => ['*']], + 'name' => 'Realtime Connections', + 'structure' => true, + 'rules' => [ + [ + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'label' => 'Container', + 'key' => 'container', + 'type' => Database::SYSTEM_VAR_TYPE_TEXT, + 'required' => true, + 'array' => false, + ], + [ + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'label' => 'Timestamp', + 'key' => 'timestamp', + 'type' => Database::SYSTEM_VAR_TYPE_NUMERIC, + 'required' => true, + 'array' => false, + ], + [ + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'label' => 'Data', + 'key' => 'data', + 'type' => Database::SYSTEM_VAR_TYPE_TEXT, + 'required' => true, + 'array' => false, + ], + ], + ], Database::SYSTEM_COLLECTION_RESERVED => [ '$collection' => Database::SYSTEM_COLLECTION_COLLECTIONS, '$id' => Database::SYSTEM_COLLECTION_RESERVED, diff --git a/src/Appwrite/Database/Database.php b/src/Appwrite/Database/Database.php index d0defdec0..f0fbfc2ef 100644 --- a/src/Appwrite/Database/Database.php +++ b/src/Appwrite/Database/Database.php @@ -41,6 +41,9 @@ class Database const SYSTEM_COLLECTION_FUNCTIONS = 'functions'; const SYSTEM_COLLECTION_TAGS = 'tags'; const SYSTEM_COLLECTION_EXECUTIONS = 'executions'; + + // Realtime + const SYSTEM_COLLECTION_REALTIME_CONNECTIONS = 'realtimeConnections'; // Var Types const SYSTEM_VAR_TYPE_TEXT = 'text'; From d8bae254383f5acd4d5e0db3d4fde704eed629e8 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 15 Jun 2021 09:15:14 +0200 Subject: [PATCH 02/15] fix(realtime): phpdoc and access modifiers --- docker-compose.yml | 38 ++++++++++++++-------------- src/Appwrite/Realtime/Server.php | 43 ++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 90e6c16d5..17fac88c7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -550,24 +550,24 @@ services: # - RESQUE_WEB_HTTP_BASIC_AUTH_USER=user # - RESQUE_WEB_HTTP_BASIC_AUTH_PASSWORD=password - # chronograf: - # image: chronograf:1.5 - # container_name: appwrite-chronograf - # restart: unless-stopped - # networks: - # - appwrite - # volumes: - # - appwrite-chronograf:/var/lib/chronograf - # ports: - # - "8888:8888" - # environment: - # - INFLUXDB_URL=http://influxdb:8086 - # - KAPACITOR_URL=http://kapacitor:9092 - # - AUTH_DURATION=48h - # - TOKEN_SECRET=duperduper5674829!jwt - # - GH_CLIENT_ID=d86f7145a41eacfc52cc - # - GH_CLIENT_SECRET=9e0081062367a2134e7f2ea95ba1a32d08b6c8ab - # - GH_ORGS=appwrite + chronograf: + image: chronograf:1.5 + container_name: appwrite-chronograf + restart: unless-stopped + networks: + - appwrite + volumes: + - appwrite-chronograf:/var/lib/chronograf + ports: + - "8888:8888" + environment: + - INFLUXDB_URL=http://influxdb:8086 + - KAPACITOR_URL=http://kapacitor:9092 + - AUTH_DURATION=48h + - TOKEN_SECRET=duperduper5674829!jwt + - GH_CLIENT_ID=d86f7145a41eacfc52cc + - GH_CLIENT_SECRET=9e0081062367a2134e7f2ea95ba1a32d08b6c8ab + - GH_ORGS=appwrite # webgrind: # image: 'jokkedk/webgrind:latest' @@ -591,4 +591,4 @@ volumes: appwrite-functions: appwrite-influxdb: appwrite-config: - # appwrite-chronograf: + appwrite-chronograf: diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index 0eed3f24b..d8baec585 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -55,10 +55,11 @@ class Server /** * This is executed when the Realtime server starts. + * * @param SwooleServer $server * @return void */ - public function onStart(SwooleServer $server): void + private function onStart(SwooleServer $server): void { Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); @@ -101,12 +102,13 @@ class Server /** * This is executed when a WebSocket worker process starts. + * * @param SwooleServer $server * @param int $workerId * @return void * @throws Exception */ - public function onWorkerStart(SwooleServer $server, int $workerId): void + private function onWorkerStart(SwooleServer $server, int $workerId): void { Console::success('Worker ' . $workerId . ' started succefully'); @@ -163,7 +165,7 @@ class Server * @throws Exception * @throws UtopiaException */ - public function onOpen(SwooleServer $server, Request $request): void + private function onOpen(SwooleServer $server, Request $request): void { $app = new App('UTC'); $connection = $request->fd; @@ -284,11 +286,12 @@ class Server /** * This is executed when a message is received by the Realtime server. + * * @param SwooleServer $server * @param Frame $frame * @return void */ - public function onMessage(SwooleServer $server, Frame $frame) + private function onMessage(SwooleServer $server, Frame $frame) { $server->push($frame->fd, 'Sending messages is not allowed.'); $server->close($frame->fd); @@ -296,11 +299,12 @@ class Server /** * This is executed when a Realtime connection is closed. + * * @param SwooleServer $server * @param int $connection * @return void */ - public function onClose(SwooleServer $server, int $connection) + private function onClose(SwooleServer $server, int $connection) { if (array_key_exists($connection, $this->connections)) { $this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); @@ -311,25 +315,25 @@ class Server /** * This is executed when an event is published on realtime channel in Redis. + * + * Supported Resources: + * - Collection + * - Document + * - File + * - Account + * - Session + * - Team? (not implemented yet) + * - Membership? (not implemented yet) + * - Function + * - Execution + * * @param string $payload * @param SwooleServer $server * @param int $workerId * @return void */ - public function onRedisPublish(string $payload, SwooleServer &$server, int $workerId) + private function onRedisPublish(string $payload, SwooleServer &$server, int $workerId) { - /** - * Supported Resources: - * - Collection - * - Document - * - File - * - Account - * - Session - * - Team? (not implemented yet) - * - Membership? (not implemented yet) - * - Function - * - Execution - */ $event = json_decode($payload, true); $receivers = Parser::identifyReceivers($event, $this->subscriptions); @@ -361,10 +365,11 @@ class Server /** * This sends the usage to the `console` channel. + * * @param SwooleServer $server * @return void */ - public function tickSendProjectUsage(SwooleServer &$server) + private function tickSendProjectUsage(SwooleServer &$server) { if ( array_key_exists('console', $this->subscriptions) From 82b3a56d26a2a23e0403e34b0b1d1ae57bcf2628 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 15 Jun 2021 10:41:02 +0200 Subject: [PATCH 03/15] revert(realtime): make access modifiers public again --- src/Appwrite/Realtime/Server.php | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index d8baec585..abc849ec9 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -24,11 +24,11 @@ use Utopia\Swoole\Request as SwooleRequest; class Server { - private Registry $register; - private SwooleServer $server; - private Table $stats; - private array $subscriptions; - private array $connections; + public Registry $register; + public SwooleServer $server; + public Table $stats; + public array $subscriptions; + public array $connections; public function __construct(Registry &$register, $host = '0.0.0.0', $port = 80, $config = []) { @@ -59,7 +59,7 @@ class Server * @param SwooleServer $server * @return void */ - private function onStart(SwooleServer $server): void + public function onStart(SwooleServer $server): void { Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); @@ -108,7 +108,7 @@ class Server * @return void * @throws Exception */ - private function onWorkerStart(SwooleServer $server, int $workerId): void + public function onWorkerStart(SwooleServer $server, int $workerId): void { Console::success('Worker ' . $workerId . ' started succefully'); @@ -165,7 +165,7 @@ class Server * @throws Exception * @throws UtopiaException */ - private function onOpen(SwooleServer $server, Request $request): void + public function onOpen(SwooleServer $server, Request $request): void { $app = new App('UTC'); $connection = $request->fd; @@ -291,7 +291,7 @@ class Server * @param Frame $frame * @return void */ - private function onMessage(SwooleServer $server, Frame $frame) + public function onMessage(SwooleServer $server, Frame $frame) { $server->push($frame->fd, 'Sending messages is not allowed.'); $server->close($frame->fd); @@ -304,7 +304,7 @@ class Server * @param int $connection * @return void */ - private function onClose(SwooleServer $server, int $connection) + public function onClose(SwooleServer $server, int $connection) { if (array_key_exists($connection, $this->connections)) { $this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); @@ -332,7 +332,7 @@ class Server * @param int $workerId * @return void */ - private function onRedisPublish(string $payload, SwooleServer &$server, int $workerId) + public function onRedisPublish(string $payload, SwooleServer &$server, int $workerId) { $event = json_decode($payload, true); @@ -369,7 +369,7 @@ class Server * @param SwooleServer $server * @return void */ - private function tickSendProjectUsage(SwooleServer &$server) + public function tickSendProjectUsage(SwooleServer &$server) { if ( array_key_exists('console', $this->subscriptions) From 7adcdf916f7d799c7bd407f3d6d346b3b0475649 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 15 Jun 2021 11:14:10 +0200 Subject: [PATCH 04/15] fix(realtime): class and phpdocs --- src/Appwrite/Realtime/Server.php | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index abc849ec9..e11fb4332 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -2,10 +2,14 @@ namespace Appwrite\Realtime; +use Appwrite\Database\Database; +use Appwrite\Database\Adapter\MySQL as MySQLAdapter; +use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Event\Event; use Appwrite\Network\Validator\Origin; use Appwrite\Utopia\Response; use Exception; +use Swoole\Coroutine\Redis; use Swoole\Http\Request; use Swoole\Http\Response as SwooleResponse; use Swoole\Process; @@ -17,6 +21,7 @@ use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit; use Utopia\App; use Utopia\CLI\Console; +use Utopia\Config\Config; use Utopia\Exception as UtopiaException; use Utopia\Registry\Registry; use Utopia\Swoole\Request as SwooleRequest; @@ -24,10 +29,46 @@ use Utopia\Swoole\Request as SwooleRequest; class Server { + /** + * Container scoped Registry. + * @var Registry + */ public Registry $register; + + /** + * Container scoped Swoole Server. + * @var SwooleServer + */ public SwooleServer $server; + + /** + * Container scoped Table. + * @var Table + */ public Table $stats; + + /** + * Container scoped Database connection. + * @var Database + */ + public Database $db; + + /** + * Container scoped Redis connection. + * @var Redis + */ + public Redis $cache; + + /** + * Worker scoped subscription. + * @var array + */ public array $subscriptions; + + /** + * Worker scoped connections. + * @var array + */ public array $connections; public function __construct(Registry &$register, $host = '0.0.0.0', $port = 80, $config = []) @@ -43,6 +84,11 @@ class Server $this->stats->column('messages', Table::TYPE_INT); $this->stats->create(); + $this->db = new Database(); + $this->db->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); + $this->db->setNamespace('app_console'); + $this->db->setMocks(Config::getParam('collections', [])); + $this->server = new SwooleServer($host, $port, SWOOLE_PROCESS); $this->server->set($config); $this->server->on('start', [$this, 'onStart']); From 4847a0b674429eb5c54d1d9aa41063cb1d5d7a90 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 15 Jun 2021 17:11:07 +0200 Subject: [PATCH 05/15] feat(realtime): shared usage stats over database --- app/realtime.php | 26 ++++++++ src/Appwrite/Realtime/Server.php | 103 ++++++++++++++++++++++++++----- 2 files changed, 113 insertions(+), 16 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 6d94b2cc4..23fb699a3 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -1,6 +1,8 @@ 64000 // Default maximum Package Size (64kb) ]; +$register->set('db', function () { + $dbHost = App::getEnv('_APP_DB_HOST', ''); + $dbUser = App::getEnv('_APP_DB_USER', ''); + $dbPass = App::getEnv('_APP_DB_PASS', ''); + $dbScheme = App::getEnv('_APP_DB_SCHEMA', ''); + + $pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array( + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4', + PDO::ATTR_TIMEOUT => 3, // Seconds + PDO::ATTR_PERSISTENT => true, + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + )); + + return $pdo; +}); +$register->set('cache', function () { // Register cache connection + $redis = new Redis(); + $redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + + return $redis; +}); + $realtimeServer = new Server($register, config: $config); diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index e11fb4332..b397bc066 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -5,6 +5,7 @@ namespace Appwrite\Realtime; use Appwrite\Database\Database; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter; +use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; use Appwrite\Network\Validator\Origin; use Appwrite\Utopia\Response; @@ -42,10 +43,10 @@ class Server public SwooleServer $server; /** - * Container scoped Table. + * Container scoped Usage Table. * @var Table */ - public Table $stats; + public Table $usage; /** * Container scoped Database connection. @@ -77,12 +78,12 @@ class Server $this->connections = []; $this->register = $register; - $this->stats = new Table(4096, 1); - $this->stats->column('projectId', Table::TYPE_STRING, 64); - $this->stats->column('connections', Table::TYPE_INT); - $this->stats->column('connectionsTotal', Table::TYPE_INT); - $this->stats->column('messages', Table::TYPE_INT); - $this->stats->create(); + $this->usage = new Table(4096, 1); + $this->usage->column('projectId', Table::TYPE_STRING, 64); + $this->usage->column('connections', Table::TYPE_INT); + $this->usage->column('connectionsTotal', Table::TYPE_INT); + $this->usage->column('messages', Table::TYPE_INT); + $this->usage->create(); $this->db = new Database(); $this->db->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); @@ -96,6 +97,7 @@ class Server $this->server->on('open', [$this, 'onOpen']); $this->server->on('message', [$this, 'onMessage']); $this->server->on('close', [$this, 'onClose']); + $this->server->container_id = uniqid(); $this->server->start(); } @@ -110,9 +112,34 @@ class Server Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); + try { + go(function() { + $document = [ + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $this->server->container_id, + 'timestamp' => time(), + 'data' => '{}' + ]; + Authorization::disable(); + $document = $this->db->createDocument($document); + Authorization::enable(); + $this->server->document_id = $document->getId(); + }); + } catch (\Throwable $th) { + Console::error('[Error] Type: '.get_class($th)); + Console::error('[Error] Message: '.$th->getMessage()); + Console::error('[Error] File: '.$th->getFile()); + Console::error('[Error] Line: '.$th->getLine()); + } + + Timer::tick(10000, function () { /** @var Table $stats */ - foreach ($this->stats as $projectId => $value) { + foreach ($this->usage as $projectId => $value) { if (empty($value['connections']) && empty($value['messages'])) { continue; } @@ -128,7 +155,7 @@ class Server ->setParam('networkRequestSize', 0) ->setParam('networkResponseSize', 0); - $this->stats->set($projectId, [ + $this->usage->set($projectId, [ 'projectId' => $projectId, 'messages' => 0, 'connections' => 0 @@ -140,6 +167,37 @@ class Server } }); + Timer::tick(10000, function () { + $payload = []; + foreach ($this->usage as $projectId => $value) { + if (!empty($value['connectionsTotal'])) { + $payload[$projectId] = $value['connectionsTotal']; + } + } + if (empty($payload)){ + return; + } + $document = [ + '$id' => $this->server->document_id, + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $this->server->container_id, + 'timestamp' => time(), + 'data' => json_encode($payload) + ]; + try { + $document = $this->db->updateDocument($document); + } catch (\Throwable $th) { + Console::error('[Error] Type: '.get_class($th)); + Console::error('[Error] Message: '.$th->getMessage()); + Console::error('[Error] File: '.$th->getFile()); + Console::error('[Error] Line: '.$th->getLine()); + } + }); + Process::signal(2, function () use ($server) { Console::log('Stop by Ctrl+C'); $server->shutdown(); @@ -302,8 +360,8 @@ class Server $server->push($connection, json_encode($channels)); - $this->stats->incr($project->getId(), 'connections'); - $this->stats->incr($project->getId(), 'connectionsTotal'); + $this->usage->incr($project->getId(), 'connections'); + $this->usage->incr($project->getId(), 'connectionsTotal'); } catch (\Throwable $th) { $response = [ 'code' => $th->getCode(), @@ -353,7 +411,7 @@ class Server public function onClose(SwooleServer $server, int $connection) { if (array_key_exists($connection, $this->connections)) { - $this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); + $this->usage->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); } Parser::unsubscribe($connection, $this->subscriptions, $this->connections); Console::info('Connection close: ' . $connection); @@ -405,7 +463,7 @@ class Server } } if (($num = count($receivers)) > 0) { - $this->stats->incr($event['project'], 'messages', $num); + $this->usage->incr($event['project'], 'messages', $num); } } @@ -423,8 +481,21 @@ class Server && array_key_exists('project', $this->subscriptions['console']['role:member']) ) { $payload = []; - foreach ($this->stats as $projectId => $value) { - $payload[$projectId] = $value['connectionsTotal']; + $list = $this->db->getCollection([ + 'filters' => [ + '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'timestamp>'.(time() - 15) + ], + ]); + + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('data')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } + } } foreach ($this->subscriptions['console']['role:member']['project'] as $connection => $value) { $server->push( From e281ea08c6175201ea951e6d6a8965e9a0c4f4e9 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 15 Jun 2021 17:14:48 +0200 Subject: [PATCH 06/15] revert(docker): adding chronograf --- docker-compose.yml | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 17fac88c7..90e6c16d5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -550,24 +550,24 @@ services: # - RESQUE_WEB_HTTP_BASIC_AUTH_USER=user # - RESQUE_WEB_HTTP_BASIC_AUTH_PASSWORD=password - chronograf: - image: chronograf:1.5 - container_name: appwrite-chronograf - restart: unless-stopped - networks: - - appwrite - volumes: - - appwrite-chronograf:/var/lib/chronograf - ports: - - "8888:8888" - environment: - - INFLUXDB_URL=http://influxdb:8086 - - KAPACITOR_URL=http://kapacitor:9092 - - AUTH_DURATION=48h - - TOKEN_SECRET=duperduper5674829!jwt - - GH_CLIENT_ID=d86f7145a41eacfc52cc - - GH_CLIENT_SECRET=9e0081062367a2134e7f2ea95ba1a32d08b6c8ab - - GH_ORGS=appwrite + # chronograf: + # image: chronograf:1.5 + # container_name: appwrite-chronograf + # restart: unless-stopped + # networks: + # - appwrite + # volumes: + # - appwrite-chronograf:/var/lib/chronograf + # ports: + # - "8888:8888" + # environment: + # - INFLUXDB_URL=http://influxdb:8086 + # - KAPACITOR_URL=http://kapacitor:9092 + # - AUTH_DURATION=48h + # - TOKEN_SECRET=duperduper5674829!jwt + # - GH_CLIENT_ID=d86f7145a41eacfc52cc + # - GH_CLIENT_SECRET=9e0081062367a2134e7f2ea95ba1a32d08b6c8ab + # - GH_ORGS=appwrite # webgrind: # image: 'jokkedk/webgrind:latest' @@ -591,4 +591,4 @@ volumes: appwrite-functions: appwrite-influxdb: appwrite-config: - appwrite-chronograf: + # appwrite-chronograf: From 554d3c355749039230ebc1f15bb31f280ce6db9f Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Wed, 16 Jun 2021 11:35:37 +0200 Subject: [PATCH 07/15] feat(maintenance): add realtime usage stats --- app/init.php | 1 + app/tasks/maintenance.php | 9 +++++++++ app/workers/deletes.php | 17 +++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/app/init.php b/app/init.php index cb87697a8..ad1a9bd3a 100644 --- a/app/init.php +++ b/app/init.php @@ -69,6 +69,7 @@ const DELETE_TYPE_EXECUTIONS = 'executions'; const DELETE_TYPE_AUDIT = 'audit'; const DELETE_TYPE_ABUSE = 'abuse'; const DELETE_TYPE_CERTIFICATES = 'certificates'; +const DELETE_TYPE_REALTIME = 'realtime'; // Auth Types const APP_AUTH_TYPE_SESSION = 'Session'; const APP_AUTH_TYPE_JWT = 'JWT'; diff --git a/app/tasks/maintenance.php b/app/tasks/maintenance.php index eccdf61b1..a4db6ac4e 100644 --- a/app/tasks/maintenance.php +++ b/app/tasks/maintenance.php @@ -39,6 +39,14 @@ $cli ]); } + function notifyDeleteRealtimeUsage() + { + Resque::enqueue(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, [ + 'type' => DELETE_TYPE_REALTIME, + 'timestamp' => time() - 60 + ]); + } + // # of days in seconds (1 day = 86400s) $interval = (int) App::getEnv('_APP_MAINTENANCE_INTERVAL', '86400'); $executionLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', '1209600'); @@ -51,5 +59,6 @@ $cli notifyDeleteExecutionLogs($executionLogsRetention); notifyDeleteAbuseLogs($abuseLogsRetention); notifyDeleteAuditLogs($auditLogRetention); + notifyDeleteRealtimeUsage(); }, $interval); }); \ No newline at end of file diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 63551e90f..0080d35f3 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -69,6 +69,10 @@ class DeletesV1 extends Worker $this->deleteAbuseLogs($this->args['timestamp']); break; + case DELETE_TYPE_REALTIME: + $this->deleteRealtimeUsage($this->args['timestamp']); + break; + case DELETE_TYPE_CERTIFICATES: $document = new Document($this->args['document']); $this->deleteCertificates($document); @@ -197,6 +201,19 @@ class DeletesV1 extends Worker }); } + protected function deleteRealtimeUsage($timestamp) + { + if (!($consoleDB = $this->getConsoleDB())) { + throw new Exception('Failed to get consoleDb.'); + } + // Delete Dead Realtime Logs + $this->deleteByGroup([ + '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'timestamp<'.$timestamp + ], $consoleDB); + + } + protected function deleteFunction(Document $document, $projectId) { $projectDB = $this->getProjectDB($projectId); From ee0c9e5b8177afaf355c6bc837228885c09f37c3 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 17 Jun 2021 14:11:48 +0200 Subject: [PATCH 08/15] remove unnecessary db instances --- src/Appwrite/Realtime/Server.php | 47 ++++++++++++++------------------ 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index 22a0b2712..64460e7ad 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -52,7 +52,13 @@ class Server * Container scoped Database connection. * @var Database */ - public Database $db; + public Database $consoleDb; + + /** + * Container scoped Database connection. + * @var Database + */ + public Database $projectDb; /** * Container scoped Redis connection. @@ -85,10 +91,14 @@ class Server $this->usage->column('messages', Table::TYPE_INT); $this->usage->create(); - $this->db = new Database(); - $this->db->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); - $this->db->setNamespace('app_console'); - $this->db->setMocks(Config::getParam('collections', [])); + $this->consoleDb = new Database(); + $this->consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); + $this->consoleDb->setNamespace('app_console'); + $this->consoleDb->setMocks(Config::getParam('collections', [])); + + $this->projectDb = new Database(); + $this->projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); + $this->projectDb->setMocks(Config::getParam('collections', [])); $this->server = new SwooleServer($host, $port, SWOOLE_PROCESS); $this->server->set($config); @@ -125,7 +135,7 @@ class Server 'data' => '{}' ]; Authorization::disable(); - $document = $this->db->createDocument($document); + $document = $this->consoleDb->createDocument($document); Authorization::enable(); $this->server->document_id = $document->getId(); }); @@ -189,7 +199,7 @@ class Server 'data' => json_encode($payload) ]; try { - $document = $this->db->updateDocument($document); + $document = $this->consoleDb->updateDocument($document); } catch (\Throwable $th) { Console::error('[Error] Type: '.get_class($th)); Console::error('[Error] Message: '.$th->getMessage()); @@ -485,7 +495,7 @@ class Server && array_key_exists('project', $this->subscriptions['console']['role:member']) ) { $payload = []; - $list = $this->db->getCollection([ + $list = $this->consoleDb->getCollection([ 'filters' => [ '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, 'timestamp>'.(time() - 15) @@ -528,26 +538,9 @@ class Server return; } - /** - * This is redundant soon and will be gone with merging the usage branch. - */ - $db = $this->register->get('dbPool')->get(); - $redis = $this->register->get('redisPool')->get(); + $this->projectDb->setNamespace('app_'.$project); - $this->register->set('db', function () use (&$db) { - return $db; - }); - - $this->register->set('cache', function () use (&$redis) { - return $redis; - }); - - $projectDB = new Database(); - $projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); - $projectDB->setNamespace('app_'.$project); - $projectDB->setMocks(Config::getParam('collections', [])); - - $user = $projectDB->getDocument($userId); + $user = $this->projectDb->getDocument($userId); Parser::setUser($user); From a3cc8c9ad23110a42f32234436499d6c86e057f4 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Fri, 18 Jun 2021 12:00:27 +0200 Subject: [PATCH 09/15] adapt to review --- app/config/collections.php | 14 +++++++------- app/tasks/maintenance.php | 4 ++-- src/Appwrite/Database/Database.php | 2 +- src/Appwrite/Realtime/Server.php | 14 ++++++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/app/config/collections.php b/app/config/collections.php index d1d623079..62f6a41bb 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -1698,15 +1698,15 @@ $collections = [ ], ], ], - Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS => [ + Database::SYSTEM_COLLECTION_CONNECTIONS => [ '$collection' => Database::SYSTEM_COLLECTION_COLLECTIONS, - '$id' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$id' => Database::SYSTEM_COLLECTION_CONNECTIONS, '$permissions' => ['read' => ['*']], 'name' => 'Realtime Connections', 'structure' => true, 'rules' => [ [ - '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, 'label' => 'Container', 'key' => 'container', 'type' => Database::SYSTEM_VAR_TYPE_TEXT, @@ -1714,7 +1714,7 @@ $collections = [ 'array' => false, ], [ - '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, 'label' => 'Timestamp', 'key' => 'timestamp', 'type' => Database::SYSTEM_VAR_TYPE_NUMERIC, @@ -1722,9 +1722,9 @@ $collections = [ 'array' => false, ], [ - '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, - 'label' => 'Data', - 'key' => 'data', + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + 'label' => 'Value', + 'key' => 'value', 'type' => Database::SYSTEM_VAR_TYPE_TEXT, 'required' => true, 'array' => false, diff --git a/app/tasks/maintenance.php b/app/tasks/maintenance.php index a4db6ac4e..e36c98083 100644 --- a/app/tasks/maintenance.php +++ b/app/tasks/maintenance.php @@ -39,7 +39,7 @@ $cli ]); } - function notifyDeleteRealtimeUsage() + function notifyDeleteConnections() { Resque::enqueue(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, [ 'type' => DELETE_TYPE_REALTIME, @@ -59,6 +59,6 @@ $cli notifyDeleteExecutionLogs($executionLogsRetention); notifyDeleteAbuseLogs($abuseLogsRetention); notifyDeleteAuditLogs($auditLogRetention); - notifyDeleteRealtimeUsage(); + notifyDeleteConnections(); }, $interval); }); \ No newline at end of file diff --git a/src/Appwrite/Database/Database.php b/src/Appwrite/Database/Database.php index f0fbfc2ef..09a66ee72 100644 --- a/src/Appwrite/Database/Database.php +++ b/src/Appwrite/Database/Database.php @@ -43,7 +43,7 @@ class Database const SYSTEM_COLLECTION_EXECUTIONS = 'executions'; // Realtime - const SYSTEM_COLLECTION_REALTIME_CONNECTIONS = 'realtimeConnections'; + const SYSTEM_COLLECTION_CONNECTIONS = 'connections'; // Var Types const SYSTEM_VAR_TYPE_TEXT = 'text'; diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index 64460e7ad..4fd7623a8 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -125,14 +125,14 @@ class Server try { go(function() { $document = [ - '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, '$permissions' => [ 'read' => ['*'], 'write' => ['*'], ], 'container' => $this->server->container_id, 'timestamp' => time(), - 'data' => '{}' + 'value' => '{}' ]; Authorization::disable(); $document = $this->consoleDb->createDocument($document); @@ -147,6 +147,7 @@ class Server } + // Run ever 10 seconds Timer::tick(10000, function () { /** @var Table $stats */ foreach ($this->usage as $projectId => $value) { @@ -177,6 +178,7 @@ class Server } }); + // Run ever 10 seconds Timer::tick(10000, function () { $payload = []; foreach ($this->usage as $projectId => $value) { @@ -189,14 +191,14 @@ class Server } $document = [ '$id' => $this->server->document_id, - '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, '$permissions' => [ 'read' => ['*'], 'write' => ['*'], ], 'container' => $this->server->container_id, 'timestamp' => time(), - 'data' => json_encode($payload) + 'value' => json_encode($payload) ]; try { $document = $this->consoleDb->updateDocument($document); @@ -497,13 +499,13 @@ class Server $payload = []; $list = $this->consoleDb->getCollection([ 'filters' => [ - '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$collection='.Database::SYSTEM_COLLECTION_CONNECTIONS, 'timestamp>'.(time() - 15) ], ]); foreach ($list as $document) { - foreach (json_decode($document->getAttribute('data')) as $projectId => $value) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { if (array_key_exists($projectId, $payload)) { $payload[$projectId] += $value; } else { From b9d95f769cb3dc1e1f73670d28457af898403fdd Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 1 Jul 2021 12:31:48 +0200 Subject: [PATCH 10/15] fix after rebase --- app/realtime.php | 140 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 128 insertions(+), 12 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index a39b8ea45..88abbc20d 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,6 +4,7 @@ use Appwrite\Auth\Auth; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Database; +use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; @@ -39,14 +40,70 @@ $stats->column('connectionsTotal', Table::TYPE_INT); $stats->column('messages', Table::TYPE_INT); $stats->create(); +$containerId = uniqid(); +$documentId = null; + $server = new Server($adapter); $realtime = new Realtime(); -$server->onStart(function () use ($stats) { +$server->onStart(function () use ($stats, $register, $containerId, &$documentId) { Console::success('Server started succefully'); - Timer::tick(10000, function () use ($stats) { + $getConsoleDb = function () use ($register) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + return [ + $consoleDb, + function () use ($register, $db, $cache) { + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); + } + ]; + }; + + /** + * Create document for this worker for connection stats across Containers. + */ + go(function () use ($getConsoleDb, $containerId, &$documentId) { + try { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + $document = [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => '{}' + ]; + Authorization::disable(); + $document = $consoleDb->createDocument($document); + Authorization::enable(); + $documentId = $document->getId(); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } + }); + + /** + * Save current connections to the Database every 5 seconds. + */ + Timer::tick(5000, function () use ($stats, $getConsoleDb, $containerId, &$documentId) { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + foreach ($stats as $projectId => $value) { if (empty($value['connections']) && empty($value['messages'])) { continue; @@ -73,6 +130,36 @@ $server->onStart(function () use ($stats) { $usage->trigger(); } } + $payload = []; + foreach ($stats as $projectId => $value) { + if (!empty($value['connectionsTotal'])) { + $payload[$projectId] = $value['connectionsTotal']; + } + } + if (empty($payload)) { + return; + } + $document = [ + '$id' => $documentId, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => json_encode($payload) + ]; + try { + $document = $consoleDb->updateDocument($document); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } }); }); @@ -81,27 +168,56 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $attempts = 0; $start = time(); - $redisPool = $register->get('redisPool'); /** * Sending current connections to project channels on the console project every 5 seconds. */ - Timer::tick(5000, function () use ($server, $stats, $realtime) { + Timer::tick(5000, function () use ($server, $register, $realtime) { if ($realtime->hasSubscriber('console', 'role:member', 'project')) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + $projectDb = new Database(); + $projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $projectDb->setMocks(Config::getParam('collections', [])); + $payload = []; - foreach ($stats as $projectId => $value) { - $payload[$projectId] = $value['connectionsTotal']; + $list = $consoleDb->getCollection([ + 'filters' => [ + '$collection=' . Database::SYSTEM_COLLECTION_CONNECTIONS, + 'timestamp>' . (time() - 15) + ], + ]); + + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } + } } $event = [ - 'event' => 'stats.connections', - 'channels' => ['project'], + 'project' => 'console', 'permissions' => ['role:member'], - 'timestamp' => time(), - 'payload' => $payload + 'data' => [ + 'event' => 'stats.connections', + 'channels' => ['project'], + 'timestamp' => time(), + 'payload' => $payload + ] ]; $server->send($realtime->getReceivers($event), json_encode($event)); + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); } }); @@ -115,7 +231,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $start = time(); /** @var Redis $redis */ - $redis = $redisPool->get(); + $redis = $register->get('redisPool')->get(); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -177,7 +293,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, }); } catch (\Throwable $th) { Console::error('Pub/sub error: ' . $th->getMessage()); - $redisPool->put($redis); + $register->get('redisPool')->put($redis); $attempts++; continue; } From fefd82680c8d1a4f0863fd18f8c76ee4fae63f6f Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 17 Aug 2021 11:08:18 +0200 Subject: [PATCH 11/15] fix(realtime): whitespace --- app/realtime.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/realtime.php b/app/realtime.php index 779eb590e..f4a552ed3 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -67,7 +67,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$documentId) } ]; }; - + /** * Create document for this worker for connection stats across Containers. */ From 22f611da2935196746cee68f51e39ca85d3d399c Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Wed, 18 Aug 2021 17:44:11 +0200 Subject: [PATCH 12/15] fix comments --- app/realtime.php | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index ad9eb27e3..129c28272 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -27,12 +27,11 @@ require_once __DIR__ . '/init.php'; Runtime::enableCoroutine(SWOOLE_HOOK_ALL); -$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80)); -$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb) - -$subscriptions = []; -$connections = []; +$realtime = new Realtime(); +/** + * Table for statistics across all workers. + */ $stats = new Table(4096, 1); $stats->column('projectId', Table::TYPE_STRING, 64); $stats->column('connections', Table::TYPE_INT); @@ -43,9 +42,10 @@ $stats->create(); $containerId = uniqid(); $documentId = null; -$server = new Server($adapter); +$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80)); +$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb) -$realtime = new Realtime(); +$server = new Server($adapter); $server->onStart(function () use ($stats, $register, $containerId, &$documentId) { Console::success('Server started succefully'); @@ -69,7 +69,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$documentId) }; /** - * Create document for this worker for connection stats across Containers. + * Create document for this worker to share stats across Containers. */ go(function () use ($getConsoleDb, $containerId, &$documentId) { try { @@ -194,6 +194,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ], ]); + /** + * Aggregate stats across containers. + */ foreach ($list as $document) { foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { if (array_key_exists($projectId, $payload)) { From f73dd1d64aa7434914fa6de03732fc4874163b08 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 19 Aug 2021 10:03:52 +0200 Subject: [PATCH 13/15] fix(realtime): polish code --- app/realtime.php | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 129c28272..67a1d68e3 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -182,10 +182,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $consoleDb->setNamespace('app_console'); $consoleDb->setMocks(Config::getParam('collections', [])); - $projectDb = new Database(); - $projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); - $projectDb->setMocks(Config::getParam('collections', [])); - $payload = []; $list = $consoleDb->getCollection([ 'filters' => [ @@ -219,6 +215,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ]; $server->send($realtime->getSubscribers($event), json_encode($event['data'])); + + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); } /** * Sending test message for SDK E2E tests every 5 seconds. @@ -294,9 +293,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $receivers = $realtime->getSubscribers($event); - // Temporarily print debug logs by default for Alpha testing. - // if (App::isDevelopment() && !empty($receivers)) { - if (!empty($receivers)) { + if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers)); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); @@ -419,15 +416,16 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, 'code' => $th->getCode(), 'message' => $th->getMessage() ]; - // Temporarily print debug logs by default for Alpha testing. - //if (App::isDevelopment()) { - Console::error("[Error] Connection Error"); - Console::error("[Error] Code: " . $response['code']); - Console::error("[Error] Message: " . $response['message']); - //} + $server->send([$connection], json_encode($response)); $server->close($connection, $th->getCode()); + if (App::isDevelopment()) { + Console::error("[Error] Connection Error"); + Console::error("[Error] Code: " . $response['code']); + Console::error("[Error] Message: " . $response['message']); + } + if ($th instanceof PDOException) { $db = null; } From cf09129cc233f66d424d481e32cc2ba564b81bd1 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 19 Aug 2021 10:24:41 +0200 Subject: [PATCH 14/15] fix(realtime): adapt to psalm --- app/realtime.php | 2 +- src/Appwrite/Messaging/Adapter.php | 4 +-- src/Appwrite/Messaging/Adapter/Realtime.php | 30 +++++++++++---------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 67a1d68e3..eb89df33b 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -260,7 +260,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, $realtime) { + $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php index d788e34d1..6ef2d5cfd 100644 --- a/src/Appwrite/Messaging/Adapter.php +++ b/src/Appwrite/Messaging/Adapter.php @@ -4,7 +4,7 @@ namespace Appwrite\Messaging; abstract class Adapter { - public abstract function subscribe(string $project, mixed $identifier, array $roles, array $channels): void; + public abstract function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void; public abstract function unsubscribe(mixed $identifier): void; - public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $permissions, array $options): void; + public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $roles, array $options): void; } diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 1bd1245a9..98a9b30fa 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -35,15 +35,15 @@ class Realtime extends Adapter /** * Adds a subscribtion. - * @param string $projectId Project ID. - * @param mixed $connection Unique Identifier - Connection ID. - * @param array $roles Roles of the Subscription. - * @param array $channels Subscribed Channels. + * + * @param string $projectId + * @param mixed $identifier + * @param array $roles + * @param array $channels * @return void */ - public function subscribe(string $projectId, mixed $connection, array $roles, array $channels): void + public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void { - //TODO: merge project & channel to a single layer if (!isset($this->subscriptions[$projectId])) { // Init Project $this->subscriptions[$projectId] = []; } @@ -54,11 +54,11 @@ class Realtime extends Adapter } foreach ($channels as $channel => $list) { - $this->subscriptions[$projectId][$role][$channel][$connection] = true; + $this->subscriptions[$projectId][$role][$channel][$identifier] = true; } } - $this->connections[$connection] = [ + $this->connections[$identifier] = [ 'projectId' => $projectId, 'roles' => $roles, 'channels' => $channels @@ -119,7 +119,7 @@ class Realtime extends Adapter /** * Sends an event to the Realtime Server. - * @param string $project + * @param string $projectId * @param array $payload * @param string $event * @param array $channels @@ -127,9 +127,9 @@ class Realtime extends Adapter * @param array $options * @return void */ - public static function send(string $project, array $payload, string $event, array $channels, array $roles, array $options = []): void + public static function send(string $projectId, array $payload, string $event, array $channels, array $roles, array $options = []): void { - if (empty($channels) || empty($roles) || empty($project)) return; + if (empty($channels) || empty($roles) || empty($projectId)) return; $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; @@ -137,7 +137,7 @@ class Realtime extends Adapter $redis = new \Redis(); //TODO: make this part of the constructor $redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); $redis->publish('realtime', json_encode([ - 'project' => $project, + 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, 'userId' => $userId, @@ -224,8 +224,10 @@ class Realtime extends Adapter /** * Create channels array based on the event name and payload. - * - * @return void + * + * @param string $event + * @param Document $payload + * @return array */ public static function fromPayload(string $event, Document $payload): array { From b56d886717fe42d8611b33739b530bd1d516781e Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 19 Aug 2021 10:34:32 +0200 Subject: [PATCH 15/15] add comments --- src/Appwrite/Messaging/Adapter/Realtime.php | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 98a9b30fa..11eca4ade 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -4,6 +4,7 @@ namespace Appwrite\Messaging\Adapter; use Appwrite\Database\Document; use Appwrite\Messaging\Adapter; +use Redis; use Utopia\App; class Realtime extends Adapter @@ -165,16 +166,34 @@ class Realtime extends Adapter */ public function getSubscribers(array $event) { - //TODO: do comments + $receivers = []; + /** + * Check if project has subscriber. + */ if (isset($this->subscriptions[$event['project']])) { + /** + * Iterate through each role. + */ foreach ($this->subscriptions[$event['project']] as $role => $subscription) { + /** + * Iterate through each channel. + */ foreach ($event['data']['channels'] as $channel) { + /** + * Check if channel has subscriber. Also taking care of the role in the event and the wildcard role. + */ if ( \array_key_exists($channel, $this->subscriptions[$event['project']][$role]) && (\in_array($role, $event['roles']) || \in_array('*', $event['roles'])) ) { + /** + * Saving all connections that are allowed to receive this event. + */ foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) { + /** + * To prevent duplicates, we save the connections as array keys. + */ $receivers[$id] = 0; } break;