diff --git a/app/realtime.php b/app/realtime.php index a39b8ea45..88abbc20d 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,6 +4,7 @@ use Appwrite\Auth\Auth; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Database; +use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; @@ -39,14 +40,70 @@ $stats->column('connectionsTotal', Table::TYPE_INT); $stats->column('messages', Table::TYPE_INT); $stats->create(); +$containerId = uniqid(); +$documentId = null; + $server = new Server($adapter); $realtime = new Realtime(); -$server->onStart(function () use ($stats) { +$server->onStart(function () use ($stats, $register, $containerId, &$documentId) { Console::success('Server started succefully'); - Timer::tick(10000, function () use ($stats) { + $getConsoleDb = function () use ($register) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + return [ + $consoleDb, + function () use ($register, $db, $cache) { + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); + } + ]; + }; + + /** + * Create document for this worker for connection stats across Containers. + */ + go(function () use ($getConsoleDb, $containerId, &$documentId) { + try { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + $document = [ + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => '{}' + ]; + Authorization::disable(); + $document = $consoleDb->createDocument($document); + Authorization::enable(); + $documentId = $document->getId(); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } + }); + + /** + * Save current connections to the Database every 5 seconds. + */ + Timer::tick(5000, function () use ($stats, $getConsoleDb, $containerId, &$documentId) { + [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb); + foreach ($stats as $projectId => $value) { if (empty($value['connections']) && empty($value['messages'])) { continue; @@ -73,6 +130,36 @@ $server->onStart(function () use ($stats) { $usage->trigger(); } } + $payload = []; + foreach ($stats as $projectId => $value) { + if (!empty($value['connectionsTotal'])) { + $payload[$projectId] = $value['connectionsTotal']; + } + } + if (empty($payload)) { + return; + } + $document = [ + '$id' => $documentId, + '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS, + '$permissions' => [ + 'read' => ['*'], + 'write' => ['*'], + ], + 'container' => $containerId, + 'timestamp' => time(), + 'value' => json_encode($payload) + ]; + try { + $document = $consoleDb->updateDocument($document); + } catch (\Throwable $th) { + Console::error('[Error] Type: ' . get_class($th)); + Console::error('[Error] Message: ' . $th->getMessage()); + Console::error('[Error] File: ' . $th->getFile()); + Console::error('[Error] Line: ' . $th->getLine()); + } finally { + call_user_func($returnConsoleDb); + } }); }); @@ -81,27 +168,56 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $attempts = 0; $start = time(); - $redisPool = $register->get('redisPool'); /** * Sending current connections to project channels on the console project every 5 seconds. */ - Timer::tick(5000, function () use ($server, $stats, $realtime) { + Timer::tick(5000, function () use ($server, $register, $realtime) { if ($realtime->hasSubscriber('console', 'role:member', 'project')) { + $db = $register->get('dbPool')->get(); + $cache = $register->get('redisPool')->get(); + + $consoleDb = new Database(); + $consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $consoleDb->setNamespace('app_console'); + $consoleDb->setMocks(Config::getParam('collections', [])); + + $projectDb = new Database(); + $projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache)); + $projectDb->setMocks(Config::getParam('collections', [])); + $payload = []; - foreach ($stats as $projectId => $value) { - $payload[$projectId] = $value['connectionsTotal']; + $list = $consoleDb->getCollection([ + 'filters' => [ + '$collection=' . Database::SYSTEM_COLLECTION_CONNECTIONS, + 'timestamp>' . (time() - 15) + ], + ]); + + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } + } } $event = [ - 'event' => 'stats.connections', - 'channels' => ['project'], + 'project' => 'console', 'permissions' => ['role:member'], - 'timestamp' => time(), - 'payload' => $payload + 'data' => [ + 'event' => 'stats.connections', + 'channels' => ['project'], + 'timestamp' => time(), + 'payload' => $payload + ] ]; $server->send($realtime->getReceivers($event), json_encode($event)); + $register->get('dbPool')->put($db); + $register->get('redisPool')->put($cache); } }); @@ -115,7 +231,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $start = time(); /** @var Redis $redis */ - $redis = $redisPool->get(); + $redis = $register->get('redisPool')->get(); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -177,7 +293,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, }); } catch (\Throwable $th) { Console::error('Pub/sub error: ' . $th->getMessage()); - $redisPool->put($redis); + $register->get('redisPool')->put($redis); $attempts++; continue; }