Added redis persistent connection
This commit is contained in:
parent
3f0595c04a
commit
77e35e7081
1 changed files with 64 additions and 62 deletions
108
app/realtime.php
108
app/realtime.php
|
@ -1,6 +1,9 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
|
require_once __DIR__.'/../vendor/autoload.php';
|
||||||
|
|
||||||
use Swoole\WebSocket\Server;
|
use Swoole\WebSocket\Server;
|
||||||
|
use Swoole\Http\Request;
|
||||||
use Swoole\WebSocket\Frame;
|
use Swoole\WebSocket\Frame;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
|
|
||||||
|
@ -18,92 +21,91 @@ use Utopia\CLI\Console;
|
||||||
* - https://github.com/shuixn/socket.io-swoole-server
|
* - https://github.com/shuixn/socket.io-swoole-server
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL);
|
ini_set('default_socket_timeout', -1);
|
||||||
|
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||||
|
|
||||||
$server = new Server("0.0.0.0", 80);
|
$server = new Server("0.0.0.0", 80);
|
||||||
|
|
||||||
// $redis = new \Swoole\Coroutine\Redis();
|
$connections = [];
|
||||||
// $redis->on('message', function(\Swoole\Coroutine\Redis $redis, $rs) use ($server) {
|
|
||||||
// var_dump($server);
|
|
||||||
// echo 'redis got message' . PHP_EOL;
|
|
||||||
// var_dump($rs);
|
|
||||||
// $server->send(1, $rs);
|
|
||||||
// });
|
|
||||||
// $redis->connect('redis', 6379, function(\Swoole\Coroutine\Redis $redis, $result){
|
|
||||||
|
|
||||||
// echo 'connected to redis' . PHP_EOL;
|
$server->on("workerStart", function ($server, $workerId) use (&$connections) {
|
||||||
// $redis->subscribe('chat');
|
|
||||||
// });
|
|
||||||
|
|
||||||
$server->on("workerStart", function ($server, $workerId) {
|
|
||||||
Console::success('Worker '.++$workerId.' started succefully');
|
Console::success('Worker '.++$workerId.' started succefully');
|
||||||
|
|
||||||
|
$attempts = 0;
|
||||||
|
$start = time();
|
||||||
|
|
||||||
|
while ($attempts < 3) {
|
||||||
|
try {
|
||||||
$redis = new Redis();
|
$redis = new Redis();
|
||||||
$redis->connect('redis', 6379);
|
$redis->connect('redis', 6379);
|
||||||
|
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||||
|
|
||||||
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) {
|
if($attempts > 0) {
|
||||||
var_dump($redis, $channel, $message);
|
Console::error('Connection lost (lasted '.(time() - $start).' seconds). Attempting restart (attempt #'.$attempts.')');
|
||||||
|
}
|
||||||
|
|
||||||
|
if($redis->ping('')) {
|
||||||
|
$attempts = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(1); // 1 sec delay between connection attempts
|
||||||
|
|
||||||
|
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) {
|
||||||
$message = 'Message from worker #'.$workerId.'; '.$message;
|
$message = 'Message from worker #'.$workerId.'; '.$message;
|
||||||
|
|
||||||
foreach($server->connections as $fd) {
|
Console::warning('Total connections: '.count($connections));
|
||||||
if ($server->exist($fd) && $server->isEstablished($fd)) {
|
|
||||||
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
|
foreach($connections as $fd) {
|
||||||
|
if ($server->exist($fd)
|
||||||
|
&& $server->isEstablished($fd)
|
||||||
|
) {
|
||||||
|
Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
|
||||||
|
|
||||||
|
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||||
|
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
$server->close($fd);
|
$server->close($fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$attempts++;
|
||||||
|
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
$attempts++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Console::error('Failed to restart connection...');
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->on('BeforeReload', function($serv, $workerId) {
|
|
||||||
Console::success('Starting reload...');
|
|
||||||
});
|
|
||||||
|
|
||||||
$server->on('AfterReload', function($serv, $workerId) {
|
|
||||||
Console::success('Reload completed...');
|
|
||||||
});
|
|
||||||
|
|
||||||
// $process = new Process(function($process) use ($server) {
|
|
||||||
// while (true) {
|
|
||||||
// $msg = $process->read();
|
|
||||||
|
|
||||||
// foreach($server->connections as $fd) {
|
|
||||||
// if ($server->exist($fd) && $server->isEstablished($fd)) {
|
|
||||||
// $server->push($fd, json_encode(['hey there']), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// sleep(10);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
|
|
||||||
// $server->addProcess($process);
|
|
||||||
|
|
||||||
$server->on("start", function (Server $server) {
|
$server->on("start", function (Server $server) {
|
||||||
Console::success('Server started succefully');
|
Console::success('Server started succefully');
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->on('open', function(Server $server, Swoole\Http\Request $request) {
|
$server->on('open', function(Server $server, Request $request) use (&$connections) {
|
||||||
echo "connection open: {$request->fd}\n";
|
$connections[] = $request->fd;
|
||||||
|
|
||||||
foreach($server->connections as $fd) {
|
Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})");
|
||||||
if ($server->exist($fd) && $server->isEstablished($fd)) {
|
Console::info('Total connections: '.count($connections));
|
||||||
$server->push($fd, json_encode(['hey there', count($server->ports[0]->connections), ]), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$server->push($request->fd, json_encode(["hello", time()]));
|
$server->push($request->fd, json_encode(["hello", count($connections)]));
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->on('message', function(Server $server, Frame $frame) {
|
$server->on('message', function(Server $server, Frame $frame) {
|
||||||
echo "received message: {$frame->data}\n";
|
if($frame->data === 'reload') {
|
||||||
|
$server->reload();
|
||||||
|
}
|
||||||
|
|
||||||
|
Console::info('Recieved message: '.$frame->data.' (user: '.$frame->fd.', worker: '.$server->getWorkerId().')');
|
||||||
|
|
||||||
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
|
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->on('close', function(Server $server, int $fd) {
|
$server->on('close', function(Server $server, int $fd) {
|
||||||
echo "connection close: {$fd}\n";
|
Console::error('Connection close: '.$fd);
|
||||||
});
|
});
|
||||||
|
|
||||||
$server->start();
|
$server->start();
|
Loading…
Reference in a new issue