diff --git a/app/realtime.php b/app/realtime.php index b439ac519..90416aa17 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -1,6 +1,9 @@ on('message', function(\Swoole\Coroutine\Redis $redis, $rs) use ($server) { -// var_dump($server); -// echo 'redis got message' . PHP_EOL; -// var_dump($rs); -// $server->send(1, $rs); -// }); -// $redis->connect('redis', 6379, function(\Swoole\Coroutine\Redis $redis, $result){ +$connections = []; -// echo 'connected to redis' . PHP_EOL; -// $redis->subscribe('chat'); -// }); - -$server->on("workerStart", function ($server, $workerId) { +$server->on("workerStart", function ($server, $workerId) use (&$connections) { Console::success('Worker '.++$workerId.' started succefully'); - $redis = new Redis(); - $redis->connect('redis', 6379); + $attempts = 0; + $start = time(); + + while ($attempts < 3) { + try { + $redis = new Redis(); + $redis->connect('redis', 6379); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); - $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) { - var_dump($redis, $channel, $message); - - $message = 'Message from worker #'.$workerId.'; '.$message; - - foreach($server->connections as $fd) { - if ($server->exist($fd) && $server->isEstablished($fd)) { - $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); + if($attempts > 0) { + Console::error('Connection lost (lasted '.(time() - $start).' seconds). Attempting restart (attempt #'.$attempts.')'); } - else { - $server->close($fd); + + if($redis->ping('')) { + $attempts = 0; } + + sleep(1); // 1 sec delay between connection attempts + + $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) { + $message = 'Message from worker #'.$workerId.'; '.$message; + + Console::warning('Total connections: '.count($connections)); + + foreach($connections as $fd) { + if ($server->exist($fd) + && $server->isEstablished($fd) + ) { + Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); + + $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, + SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); + } + else { + $server->close($fd); + } + } + }); + + $attempts++; + + } catch (\Throwable $th) { + $attempts++; + continue; } - }); + } + + Console::error('Failed to restart connection...'); }); -$server->on('BeforeReload', function($serv, $workerId) { - Console::success('Starting reload...'); -}); - -$server->on('AfterReload', function($serv, $workerId) { - Console::success('Reload completed...'); -}); - -// $process = new Process(function($process) use ($server) { -// while (true) { -// $msg = $process->read(); - -// foreach($server->connections as $fd) { -// if ($server->exist($fd) && $server->isEstablished($fd)) { -// $server->push($fd, json_encode(['hey there']), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); -// } -// } - -// sleep(10); -// } -// }); - -// $server->addProcess($process); - $server->on("start", function (Server $server) { Console::success('Server started succefully'); }); -$server->on('open', function(Server $server, Swoole\Http\Request $request) { - echo "connection open: {$request->fd}\n"; +$server->on('open', function(Server $server, Request $request) use (&$connections) { + $connections[] = $request->fd; + + Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})"); + Console::info('Total connections: '.count($connections)); - foreach($server->connections as $fd) { - if ($server->exist($fd) && $server->isEstablished($fd)) { - $server->push($fd, json_encode(['hey there', count($server->ports[0]->connections), ]), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); - } - } - - $server->push($request->fd, json_encode(["hello", time()])); + $server->push($request->fd, json_encode(["hello", count($connections)])); }); $server->on('message', function(Server $server, Frame $frame) { - echo "received message: {$frame->data}\n"; + if($frame->data === 'reload') { + $server->reload(); + } + + Console::info('Recieved message: '.$frame->data.' (user: '.$frame->fd.', worker: '.$server->getWorkerId().')'); + $server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()])); }); $server->on('close', function(Server $server, int $fd) { - echo "connection close: {$fd}\n"; + Console::error('Connection close: '.$fd); }); $server->start(); \ No newline at end of file