diff --git a/app/realtime.php b/app/realtime.php index 497d71a8ec..48ea450927 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -32,11 +32,10 @@ ini_set('default_socket_timeout', -1); Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); $server = new Server("0.0.0.0", 80); - -$connections = []; $subscriptions = []; +$connections = []; -$server->on("workerStart", function ($server, $workerId) use (&$connections) { +$server->on("workerStart", function ($server, $workerId) use (&$subscriptions) { Console::success('Worker '.++$workerId.' started succefully'); $attempts = 0; @@ -45,7 +44,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) { while ($attempts < 300) { try { if($attempts > 0) { - Console::error('Connection lost (lasted '.(time() - $start).' seconds). Attempting restart in 5 seconds (attempt #'.$attempts.')'); + Console::error('Pub/sub connection lost (lasted '.(time() - $start).' seconds). Attempting restart in 5 seconds (attempt #'.$attempts.')'); sleep(5); // 1 sec delay between connection attempts } @@ -55,32 +54,32 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) { if($redis->ping(true)) { $attempts = 0; - Console::success('Connection established'); + Console::success('Pub/sub connection established'); } else { - Console::error('Connection failed'); + Console::error('Pub/sub failed'); } - $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) { + $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) { $message = 'Message from worker #'.$workerId.'; '.$message; - foreach($connections as $fd) { - if ($server->exist($fd) - && $server->isEstablished($fd) - ) { - Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); + // foreach($connections as $fd) { + // if ($server->exist($fd) + // && $server->isEstablished($fd) + // ) { + // Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); - $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, - SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); - } - else { - $server->close($fd); - } - } + // $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, + // SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); + // } + // else { + // $server->close($fd); + // } + // } }); } catch (\Throwable $th) { - Console::error('Connection error: '.$th->getMessage()); + Console::error('Pub/sub error: '.$th->getMessage()); $attempts++; continue; } @@ -88,7 +87,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) { $attempts++; } - Console::error('Failed to restart connection...'); + Console::error('Failed to restart pub/sub...'); }); $server->on("start", function (Server $server) { @@ -103,14 +102,10 @@ $server->on("start", function (Server $server) { }); }); -$server->on('open', function(Server $server, Request $request) use (&$connections) { - $connections[] = $request->fd; - +$server->on('open', function(Server $server, Request $request) use (&$connections, &$subscriptions) { Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})"); - Console::info('Total connections: '.count($connections)); $connection = $request->fd; - $app = new App('Asia/Tel_Aviv'); $request = new SwooleRequest($request); App::setResource('request', function () use ($request) { @@ -121,6 +116,7 @@ $server->on('open', function(Server $server, Request $request) use (&$connection return null; }); + $channels = array_flip($request->getQuery('channels', [])); $user = App::getResource('user'); $project = App::getResource('project'); @@ -136,16 +132,20 @@ $server->on('open', function(Server $server, Request $request) use (&$connection $subscriptions[$project->getId()] = []; } - if(isset($subscriptions[$project->getId()][$user->getId()])) { // Close previous connection - $server->close($subscriptions[$project->getId()][$user->getId()]['connection']); + if(!isset($subscriptions[$project->getId()][$user->getId()])) { // Add user first connection + $subscriptions[$project->getId()][$user->getId()] = []; } - $subscriptions[$project->getId()][$user->getId()] = [ - 'channels' => [], - 'connection' => $connection, + foreach ($channels as $channel => $list) { + $subscriptions[$project->getId()][$user->getId()][$channel][$connection] = true; + } + + $connections[$connection] = [ + 'projectId' => $project->getId(), + 'userId' => $user->getId() ]; - $server->push($connection, json_encode(["hello", count($connections)])); + $server->push($connection, json_encode($subscriptions)); }); $server->on('message', function(Server $server, Frame $frame) { @@ -158,8 +158,27 @@ $server->on('message', function(Server $server, Frame $frame) { $server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()])); }); -$server->on('close', function(Server $server, int $fd) { +$server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) { Console::error('Connection close: '.$fd); + + $projectId = $connections[$fd]['projectId'] ?? ''; + $userId = $connections[$fd]['userId'] ?? ''; + + foreach ($subscriptions[$projectId][$userId] as $channel => $list) { + unset($subscriptions[$projectId][$userId][$channel][$fd]); // Remove connection + + if(empty($list)) { + unset($subscriptions[$projectId][$userId][$channel]); // Remove channel + } + } + + if(empty($subscriptions[$projectId][$userId])) { + unset($subscriptions[$projectId][$userId]); // Remove user + } + + unset($connections[$fd]); + + var_dump($subscriptions); }); $server->start(); \ No newline at end of file