diff --git a/app/realtime.php b/app/realtime.php index 6d94b2cc4a..23fb699a35 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -1,6 +1,8 @@ 64000 // Default maximum Package Size (64kb) ]; +$register->set('db', function () { + $dbHost = App::getEnv('_APP_DB_HOST', ''); + $dbUser = App::getEnv('_APP_DB_USER', ''); + $dbPass = App::getEnv('_APP_DB_PASS', ''); + $dbScheme = App::getEnv('_APP_DB_SCHEMA', ''); + + $pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array( + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4', + PDO::ATTR_TIMEOUT => 3, // Seconds + PDO::ATTR_PERSISTENT => true, + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + )); + + return $pdo; +}); +$register->set('cache', function () { // Register cache connection + $redis = new Redis(); + $redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', '')); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); + + return $redis; +}); + $realtimeServer = new Server($register, config: $config); diff --git a/src/Appwrite/Realtime/Server.php b/src/Appwrite/Realtime/Server.php index e11fb43323..b397bc066c 100644 --- a/src/Appwrite/Realtime/Server.php +++ b/src/Appwrite/Realtime/Server.php @@ -5,6 +5,7 @@ namespace Appwrite\Realtime; use Appwrite\Database\Database; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter; +use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; use Appwrite\Network\Validator\Origin; use Appwrite\Utopia\Response; @@ -42,10 +43,10 @@ class Server public SwooleServer $server; /** - * Container scoped Table. + * Container scoped Usage Table. * @var Table */ - public Table $stats; + public Table $usage; /** * Container scoped Database connection. @@ -77,12 +78,12 @@ class Server $this->connections = []; $this->register = $register; - $this->stats = new Table(4096, 1); - $this->stats->column('projectId', Table::TYPE_STRING, 64); - $this->stats->column('connections', Table::TYPE_INT); - $this->stats->column('connectionsTotal', Table::TYPE_INT); - $this->stats->column('messages', Table::TYPE_INT); - $this->stats->create(); + $this->usage = new Table(4096, 1); + $this->usage->column('projectId', Table::TYPE_STRING, 64); + $this->usage->column('connections', Table::TYPE_INT); + $this->usage->column('connectionsTotal', Table::TYPE_INT); + $this->usage->column('messages', Table::TYPE_INT); + $this->usage->create(); $this->db = new Database(); $this->db->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register)); @@ -96,6 +97,7 @@ class Server $this->server->on('open', [$this, 'onOpen']); $this->server->on('message', [$this, 'onMessage']); $this->server->on('close', [$this, 'onClose']); + $this->server->container_id = uniqid(); $this->server->start(); } @@ -110,9 +112,34 @@ class Server Console::success('Server started succefully'); Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}"); + try { + go(function() { + $document = [ + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $this->server->container_id, + 'timestamp' => time(), + 'data' => '{}' + ]; + Authorization::disable(); + $document = $this->db->createDocument($document); + Authorization::enable(); + $this->server->document_id = $document->getId(); + }); + } catch (\Throwable $th) { + Console::error('[Error] Type: '.get_class($th)); + Console::error('[Error] Message: '.$th->getMessage()); + Console::error('[Error] File: '.$th->getFile()); + Console::error('[Error] Line: '.$th->getLine()); + } + + Timer::tick(10000, function () { /** @var Table $stats */ - foreach ($this->stats as $projectId => $value) { + foreach ($this->usage as $projectId => $value) { if (empty($value['connections']) && empty($value['messages'])) { continue; } @@ -128,7 +155,7 @@ class Server ->setParam('networkRequestSize', 0) ->setParam('networkResponseSize', 0); - $this->stats->set($projectId, [ + $this->usage->set($projectId, [ 'projectId' => $projectId, 'messages' => 0, 'connections' => 0 @@ -140,6 +167,37 @@ class Server } }); + Timer::tick(10000, function () { + $payload = []; + foreach ($this->usage as $projectId => $value) { + if (!empty($value['connectionsTotal'])) { + $payload[$projectId] = $value['connectionsTotal']; + } + } + if (empty($payload)){ + return; + } + $document = [ + '$id' => $this->server->document_id, + '$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $this->server->container_id, + 'timestamp' => time(), + 'data' => json_encode($payload) + ]; + try { + $document = $this->db->updateDocument($document); + } catch (\Throwable $th) { + Console::error('[Error] Type: '.get_class($th)); + Console::error('[Error] Message: '.$th->getMessage()); + Console::error('[Error] File: '.$th->getFile()); + Console::error('[Error] Line: '.$th->getLine()); + } + }); + Process::signal(2, function () use ($server) { Console::log('Stop by Ctrl+C'); $server->shutdown(); @@ -302,8 +360,8 @@ class Server $server->push($connection, json_encode($channels)); - $this->stats->incr($project->getId(), 'connections'); - $this->stats->incr($project->getId(), 'connectionsTotal'); + $this->usage->incr($project->getId(), 'connections'); + $this->usage->incr($project->getId(), 'connectionsTotal'); } catch (\Throwable $th) { $response = [ 'code' => $th->getCode(), @@ -353,7 +411,7 @@ class Server public function onClose(SwooleServer $server, int $connection) { if (array_key_exists($connection, $this->connections)) { - $this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); + $this->usage->decr($this->connections[$connection]['projectId'], 'connectionsTotal'); } Parser::unsubscribe($connection, $this->subscriptions, $this->connections); Console::info('Connection close: ' . $connection); @@ -405,7 +463,7 @@ class Server } } if (($num = count($receivers)) > 0) { - $this->stats->incr($event['project'], 'messages', $num); + $this->usage->incr($event['project'], 'messages', $num); } } @@ -423,8 +481,21 @@ class Server && array_key_exists('project', $this->subscriptions['console']['role:member']) ) { $payload = []; - foreach ($this->stats as $projectId => $value) { - $payload[$projectId] = $value['connectionsTotal']; + $list = $this->db->getCollection([ + 'filters' => [ + '$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS, + 'timestamp>'.(time() - 15) + ], + ]); + + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('data')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } + } } foreach ($this->subscriptions['console']['role:member']['project'] as $connection => $value) { $server->push(