diff --git a/app/config/collections.php b/app/config/collections.php index 39b1a03a7..dfecf3651 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -1706,6 +1706,39 @@ $collections = [ ], ], ], + Database::SYSTEM_COLLECTION_CONNECTIONS => [ + '$collection' => Database::SYSTEM_COLLECTION_COLLECTIONS, + '$id' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => ['read' => ['*']], + 'name' => 'Realtime Connections', + 'structure' => true, + 'rules' => [ + [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + 'label' => 'Container', + 'key' => 'container', + 'type' => Database::SYSTEM_VAR_TYPE_TEXT, + 'required' => true, + 'array' => false, + ], + [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + 'label' => 'Timestamp', + 'key' => 'timestamp', + 'type' => Database::SYSTEM_VAR_TYPE_NUMERIC, + 'required' => true, + 'array' => false, + ], + [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + 'label' => 'Value', + 'key' => 'value', + 'type' => Database::SYSTEM_VAR_TYPE_TEXT, + 'required' => true, + 'array' => false, + ], + ], + ], Database::SYSTEM_COLLECTION_RESERVED => [ '$collection' => Database::SYSTEM_COLLECTION_COLLECTIONS, '$id' => Database::SYSTEM_COLLECTION_RESERVED, diff --git a/app/init.php b/app/init.php index 9013b397b..47654dbec 100644 --- a/app/init.php +++ b/app/init.php @@ -71,6 +71,7 @@ const DELETE_TYPE_EXECUTIONS = 'executions'; const DELETE_TYPE_AUDIT = 'audit'; const DELETE_TYPE_ABUSE = 'abuse'; const DELETE_TYPE_CERTIFICATES = 'certificates'; +const DELETE_TYPE_REALTIME = 'realtime'; // Mail Types const MAIL_TYPE_VERIFICATION = 'verification'; const MAIL_TYPE_RECOVERY = 'recovery'; diff --git a/app/realtime.php b/app/realtime.php index ea0e5a5f7..bee946dd0 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,6 +4,7 @@ use Appwrite\Auth\Auth; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Database; +use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; @@ -26,12 +27,11 @@ require_once __DIR__ . '/init.php'; Runtime::enableCoroutine(SWOOLE_HOOK_ALL); -$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80)); -$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb) - -$subscriptions = []; -$connections = []; +$realtime = new Realtime(); +/** + * Table for statistics across all workers. + */ $stats = new Table(4096, 1); $stats->column('projectId', Table::TYPE_STRING, 64); $stats->column('teamId', Table::TYPE_STRING, 64); @@ -40,14 +40,71 @@ $stats->column('connectionsTotal', Table::TYPE_INT); $stats->column('messages', Table::TYPE_INT); $stats->create(); +$containerId = uniqid(); +$documentId = null; + +$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80)); +$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb) + $server = new Server($adapter); -$realtime = new Realtime(); - -$server->onStart(function () use ($stats) { +$server->onStart(function () use ($stats, $register, $containerId, &$documentId) { Console::success('Server started succefully'); - Timer::tick(10000, function () use ($stats) { + $getConsoleDb = function () use ($register) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + return [ + $consoleDb, + function () use ($register, $db, $cache) { + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); + } + ]; + }; + + /** + * Create document for this worker to share stats across Containers. + */ + go(function () use ($getConsoleDb, $containerId, &$documentId) { + try { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + $document = [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => '{}' + ]; + Authorization::disable(); + $document = $consoleDb->createDocument($document); + Authorization::enable(); + $documentId = $document->getId(); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } + }); + + /** + * Save current connections to the Database every 5 seconds. + */ + Timer::tick(5000, function () use ($stats, $getConsoleDb, $containerId, &$documentId) { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + foreach ($stats as $projectId => $value) { if (empty($value['connections']) && empty($value['messages'])) { continue; @@ -73,6 +130,36 @@ $server->onStart(function () use ($stats) { $usage->trigger(); } } + $payload = []; + foreach ($stats as $projectId => $value) { + if (!empty($value['connectionsTotal'])) { + $payload[$projectId] = $value['connectionsTotal']; + } + } + if (empty($payload)) { + return; + } + $document = [ + '$id' => $documentId, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => json_encode($payload) + ]; + try { + $document = $consoleDb->updateDocument($document); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } }); }); @@ -81,17 +168,42 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $attempts = 0; $start = time(); - $redisPool = $register->get('redisPool'); - Timer::tick(5000, function () use ($server, $stats, $realtime) { + Timer::tick(5000, function () use ($server, $register, $realtime, $stats) { /** * Sending current connections to project channels on the console project every 5 seconds. */ if ($realtime->hasSubscriber('console', 'role:member', 'project')) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + $payload = []; + $list = $consoleDb->getCollection([ + 'filters' => [ + '$collection=' . Database::SYSTEM_COLLECTION_CONNECTIONS, + 'timestamp>' . (time() - 15) + ], + ]); + + /** + * Aggregate stats across containers. + */ + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } + } + } + foreach ($stats as $projectId => $value) { - $payload = [ - $projectId => $value['connectionsTotal'] - ]; $event = [ 'project' => 'console', 'roles' => ['team:'.$value['teamId']], @@ -99,12 +211,17 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, 'event' => 'stats.connections', 'channels' => ['project'], 'timestamp' => time(), - 'payload' => $payload + 'payload' => [ + $projectId => $payload[$projectId] + ] ] ]; $server->send($realtime->getSubscribers($event), json_encode($event['data'])); } + + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); } /** * Sending test message for SDK E2E tests every 5 seconds. @@ -137,7 +254,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $start = time(); /** @var Redis $redis */ - $redis = $redisPool->get(); + $redis = $register->get('redisPool')->get(); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -147,7 +264,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, $realtime) { + $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { @@ -180,9 +297,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $receivers = $realtime->getSubscribers($event); - // Temporarily print debug logs by default for Alpha testing. - // if (App::isDevelopment() && !empty($receivers)) { - if (!empty($receivers)) { + if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers)); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); @@ -199,7 +314,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, }); } catch (\Throwable $th) { Console::error('Pub/sub error: ' . $th->getMessage()); - $redisPool->put($redis); + $register->get('redisPool')->put($redis); $attempts++; continue; } @@ -309,15 +424,16 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, 'code' => $th->getCode(), 'message' => $th->getMessage() ]; - // Temporarily print debug logs by default for Alpha testing. - //if (App::isDevelopment()) { - Console::error("[Error] Connection Error"); - Console::error("[Error] Code: " . $response['code']); - Console::error("[Error] Message: " . $response['message']); - //} + $server->send([$connection], json_encode($response)); $server->close($connection, $th->getCode()); + if (App::isDevelopment()) { + Console::error("[Error] Connection Error"); + Console::error("[Error] Code: " . $response['code']); + Console::error("[Error] Message: " . $response['message']); + } + if ($th instanceof PDOException) { $db = null; } diff --git a/app/tasks/maintenance.php b/app/tasks/maintenance.php index eccdf61b1..e36c98083 100644 --- a/app/tasks/maintenance.php +++ b/app/tasks/maintenance.php @@ -39,6 +39,14 @@ $cli ]); } + function notifyDeleteConnections() + { + Resque::enqueue(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, [ + 'type' => DELETE_TYPE_REALTIME, + 'timestamp' => time() - 60 + ]); + } + // # of days in seconds (1 day = 86400s) $interval = (int) App::getEnv('_APP_MAINTENANCE_INTERVAL', '86400'); $executionLogsRetention = (int) App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', '1209600'); @@ -51,5 +59,6 @@ $cli notifyDeleteExecutionLogs($executionLogsRetention); notifyDeleteAbuseLogs($abuseLogsRetention); notifyDeleteAuditLogs($auditLogRetention); + notifyDeleteConnections(); }, $interval); }); \ No newline at end of file diff --git a/app/workers/deletes.php b/app/workers/deletes.php index a0a5708b6..7e8420bee 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -72,6 +72,10 @@ class DeletesV1 extends Worker $this->deleteAbuseLogs($this->args['timestamp']); break; + case DELETE_TYPE_REALTIME: + $this->deleteRealtimeUsage($this->args['timestamp']); + break; + case DELETE_TYPE_CERTIFICATES: $document = new Document($this->args['document']); $this->deleteCertificates($document); @@ -206,6 +210,19 @@ class DeletesV1 extends Worker }); } + protected function deleteRealtimeUsage($timestamp) + { + if (!($consoleDB = $this->getConsoleDB())) { + throw new Exception('Failed to get consoleDb.'); + } + // Delete Dead Realtime Logs + $this->deleteByGroup([ + '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'timestamp<'.$timestamp + ], $consoleDB); + + } + protected function deleteFunction(Document $document, $projectId) { $projectDB = $this->getProjectDB($projectId); diff --git a/src/Appwrite/Database/Database.php b/src/Appwrite/Database/Database.php index 400a14516..43f740379 100644 --- a/src/Appwrite/Database/Database.php +++ b/src/Appwrite/Database/Database.php @@ -41,6 +41,9 @@ class Database const SYSTEM_COLLECTION_FUNCTIONS = 'functions'; const SYSTEM_COLLECTION_TAGS = 'tags'; const SYSTEM_COLLECTION_EXECUTIONS = 'executions'; + + // Realtime + const SYSTEM_COLLECTION_CONNECTIONS = 'connections'; // Var Types const SYSTEM_VAR_TYPE_TEXT = 'text'; diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php index d788e34d1..6ef2d5cfd 100644 --- a/src/Appwrite/Messaging/Adapter.php +++ b/src/Appwrite/Messaging/Adapter.php @@ -4,7 +4,7 @@ namespace Appwrite\Messaging; abstract class Adapter { - public abstract function subscribe(string $project, mixed $identifier, array $roles, array $channels): void; + public abstract function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void; public abstract function unsubscribe(mixed $identifier): void; - public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $permissions, array $options): void; + public static abstract function send(string $projectId, array $payload, string $event, array $channels, array $roles, array $options): void; } diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 1bd1245a9..11eca4ade 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -4,6 +4,7 @@ namespace Appwrite\Messaging\Adapter; use Appwrite\Database\Document; use Appwrite\Messaging\Adapter; +use Redis; use Utopia\App; class Realtime extends Adapter @@ -35,15 +36,15 @@ class Realtime extends Adapter /** * Adds a subscribtion. - * @param string $projectId Project ID. - * @param mixed $connection Unique Identifier - Connection ID. - * @param array $roles Roles of the Subscription. - * @param array $channels Subscribed Channels. + * + * @param string $projectId + * @param mixed $identifier + * @param array $roles + * @param array $channels * @return void */ - public function subscribe(string $projectId, mixed $connection, array $roles, array $channels): void + public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void { - //TODO: merge project & channel to a single layer if (!isset($this->subscriptions[$projectId])) { // Init Project $this->subscriptions[$projectId] = []; } @@ -54,11 +55,11 @@ class Realtime extends Adapter } foreach ($channels as $channel => $list) { - $this->subscriptions[$projectId][$role][$channel][$connection] = true; + $this->subscriptions[$projectId][$role][$channel][$identifier] = true; } } - $this->connections[$connection] = [ + $this->connections[$identifier] = [ 'projectId' => $projectId, 'roles' => $roles, 'channels' => $channels @@ -119,7 +120,7 @@ class Realtime extends Adapter /** * Sends an event to the Realtime Server. - * @param string $project + * @param string $projectId * @param array $payload * @param string $event * @param array $channels @@ -127,9 +128,9 @@ class Realtime extends Adapter * @param array $options * @return void */ - public static function send(string $project, array $payload, string $event, array $channels, array $roles, array $options = []): void + public static function send(string $projectId, array $payload, string $event, array $channels, array $roles, array $options = []): void { - if (empty($channels) || empty($roles) || empty($project)) return; + if (empty($channels) || empty($roles) || empty($projectId)) return; $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; @@ -137,7 +138,7 @@ class Realtime extends Adapter $redis = new \Redis(); //TODO: make this part of the constructor $redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); $redis->publish('realtime', json_encode([ - 'project' => $project, + 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, 'userId' => $userId, @@ -165,16 +166,34 @@ class Realtime extends Adapter */ public function getSubscribers(array $event) { - //TODO: do comments + $receivers = []; + /** + * Check if project has subscriber. + */ if (isset($this->subscriptions[$event['project']])) { + /** + * Iterate through each role. + */ foreach ($this->subscriptions[$event['project']] as $role => $subscription) { + /** + * Iterate through each channel. + */ foreach ($event['data']['channels'] as $channel) { + /** + * Check if channel has subscriber. Also taking care of the role in the event and the wildcard role. + */ if ( \array_key_exists($channel, $this->subscriptions[$event['project']][$role]) && (\in_array($role, $event['roles']) || \in_array('*', $event['roles'])) ) { + /** + * Saving all connections that are allowed to receive this event. + */ foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) { + /** + * To prevent duplicates, we save the connections as array keys. + */ $receivers[$id] = 0; } break; @@ -224,8 +243,10 @@ class Realtime extends Adapter /** * Create channels array based on the event name and payload. - * - * @return void + * + * @param string $event + * @param Document $payload + * @return array */ public static function fromPayload(string $event, Document $payload): array {