1
0
Fork 0
mirror of synced 2024-09-30 17:26:48 +13:00

Added connection open/close logic

This commit is contained in:
Eldad Fux 2020-10-21 15:03:50 +03:00
parent 619781a4ad
commit c491b78114

View file

@ -32,11 +32,10 @@ ini_set('default_socket_timeout', -1);
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$server = new Server("0.0.0.0", 80); $server = new Server("0.0.0.0", 80);
$connections = [];
$subscriptions = []; $subscriptions = [];
$connections = [];
$server->on("workerStart", function ($server, $workerId) use (&$connections) { $server->on("workerStart", function ($server, $workerId) use (&$subscriptions) {
Console::success('Worker '.++$workerId.' started succefully'); Console::success('Worker '.++$workerId.' started succefully');
$attempts = 0; $attempts = 0;
@ -45,7 +44,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) {
while ($attempts < 300) { while ($attempts < 300) {
try { try {
if($attempts > 0) { if($attempts > 0) {
Console::error('Connection lost (lasted '.(time() - $start).' seconds). Attempting restart in 5 seconds (attempt #'.$attempts.')'); Console::error('Pub/sub connection lost (lasted '.(time() - $start).' seconds). Attempting restart in 5 seconds (attempt #'.$attempts.')');
sleep(5); // 1 sec delay between connection attempts sleep(5); // 1 sec delay between connection attempts
} }
@ -55,32 +54,32 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) {
if($redis->ping(true)) { if($redis->ping(true)) {
$attempts = 0; $attempts = 0;
Console::success('Connection established'); Console::success('Pub/sub connection established');
} }
else { else {
Console::error('Connection failed'); Console::error('Pub/sub failed');
} }
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) { $redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) {
$message = 'Message from worker #'.$workerId.'; '.$message; $message = 'Message from worker #'.$workerId.'; '.$message;
foreach($connections as $fd) { // foreach($connections as $fd) {
if ($server->exist($fd) // if ($server->exist($fd)
&& $server->isEstablished($fd) // && $server->isEstablished($fd)
) { // ) {
Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')'); // Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, // $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS); // SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
} // }
else { // else {
$server->close($fd); // $server->close($fd);
} // }
} // }
}); });
} catch (\Throwable $th) { } catch (\Throwable $th) {
Console::error('Connection error: '.$th->getMessage()); Console::error('Pub/sub error: '.$th->getMessage());
$attempts++; $attempts++;
continue; continue;
} }
@ -88,7 +87,7 @@ $server->on("workerStart", function ($server, $workerId) use (&$connections) {
$attempts++; $attempts++;
} }
Console::error('Failed to restart connection...'); Console::error('Failed to restart pub/sub...');
}); });
$server->on("start", function (Server $server) { $server->on("start", function (Server $server) {
@ -103,14 +102,10 @@ $server->on("start", function (Server $server) {
}); });
}); });
$server->on('open', function(Server $server, Request $request) use (&$connections) { $server->on('open', function(Server $server, Request $request) use (&$connections, &$subscriptions) {
$connections[] = $request->fd;
Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})"); Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})");
Console::info('Total connections: '.count($connections));
$connection = $request->fd; $connection = $request->fd;
$app = new App('Asia/Tel_Aviv');
$request = new SwooleRequest($request); $request = new SwooleRequest($request);
App::setResource('request', function () use ($request) { App::setResource('request', function () use ($request) {
@ -121,6 +116,7 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
return null; return null;
}); });
$channels = array_flip($request->getQuery('channels', []));
$user = App::getResource('user'); $user = App::getResource('user');
$project = App::getResource('project'); $project = App::getResource('project');
@ -136,16 +132,20 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
$subscriptions[$project->getId()] = []; $subscriptions[$project->getId()] = [];
} }
if(isset($subscriptions[$project->getId()][$user->getId()])) { // Close previous connection if(!isset($subscriptions[$project->getId()][$user->getId()])) { // Add user first connection
$server->close($subscriptions[$project->getId()][$user->getId()]['connection']); $subscriptions[$project->getId()][$user->getId()] = [];
} }
$subscriptions[$project->getId()][$user->getId()] = [ foreach ($channels as $channel => $list) {
'channels' => [], $subscriptions[$project->getId()][$user->getId()][$channel][$connection] = true;
'connection' => $connection, }
$connections[$connection] = [
'projectId' => $project->getId(),
'userId' => $user->getId()
]; ];
$server->push($connection, json_encode(["hello", count($connections)])); $server->push($connection, json_encode($subscriptions));
}); });
$server->on('message', function(Server $server, Frame $frame) { $server->on('message', function(Server $server, Frame $frame) {
@ -158,8 +158,27 @@ $server->on('message', function(Server $server, Frame $frame) {
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()])); $server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
}); });
$server->on('close', function(Server $server, int $fd) { $server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) {
Console::error('Connection close: '.$fd); Console::error('Connection close: '.$fd);
$projectId = $connections[$fd]['projectId'] ?? '';
$userId = $connections[$fd]['userId'] ?? '';
foreach ($subscriptions[$projectId][$userId] as $channel => $list) {
unset($subscriptions[$projectId][$userId][$channel][$fd]); // Remove connection
if(empty($list)) {
unset($subscriptions[$projectId][$userId][$channel]); // Remove channel
}
}
if(empty($subscriptions[$projectId][$userId])) {
unset($subscriptions[$projectId][$userId]); // Remove user
}
unset($connections[$fd]);
var_dump($subscriptions);
}); });
$server->start(); $server->start();