1
0
Fork 0
mirror of synced 2024-06-29 19:50:26 +12:00
appwrite/app/realtime.php

115 lines
3.5 KiB
PHP
Raw Normal View History

2020-10-16 20:31:09 +13:00
<?php
2020-10-20 04:09:53 +13:00
require_once __DIR__.'/../vendor/autoload.php';
2020-10-16 20:31:09 +13:00
use Swoole\WebSocket\Server;
2020-10-20 04:09:53 +13:00
use Swoole\Http\Request;
2020-10-16 20:31:09 +13:00
use Swoole\WebSocket\Frame;
2020-10-19 00:51:16 +13:00
use Utopia\CLI\Console;
2020-10-16 20:31:09 +13:00
/**
* TODO List
*
* - Abuse Control / x mesages per connection
* - CORS Validation
* - Limit payload size
* - Message structure: { status: "ok"|"error", event: EVENT_NAME, data: <any arbitrary data> }
* - JWT Authentication (in path / or in message)
2020-10-17 18:48:03 +13:00
*
*
* - https://github.com/hhxsv5/php-sse
* - https://github.com/shuixn/socket.io-swoole-server
2020-10-16 20:31:09 +13:00
*/
2020-10-20 04:09:53 +13:00
ini_set('default_socket_timeout', -1);
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
2020-10-19 00:51:16 +13:00
2020-10-16 20:31:09 +13:00
$server = new Server("0.0.0.0", 80);
2020-10-20 04:09:53 +13:00
$connections = [];
2020-10-19 00:51:16 +13:00
2020-10-20 04:09:53 +13:00
$server->on("workerStart", function ($server, $workerId) use (&$connections) {
2020-10-19 00:51:16 +13:00
Console::success('Worker '.++$workerId.' started succefully');
2020-10-20 04:09:53 +13:00
$attempts = 0;
$start = time();
while ($attempts < 3) {
try {
$redis = new Redis();
$redis->connect('redis', 6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if($attempts > 0) {
Console::error('Connection lost (lasted '.(time() - $start).' seconds). Attempting restart (attempt #'.$attempts.')');
2020-10-19 00:51:16 +13:00
}
2020-10-20 04:09:53 +13:00
if($redis->ping('')) {
$attempts = 0;
2020-10-20 07:56:02 +13:00
Console::success('Connection established');
}
else {
Console::error('Connection failed');
2020-10-19 00:51:16 +13:00
}
2020-10-20 04:09:53 +13:00
sleep(1); // 1 sec delay between connection attempts
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) {
$message = 'Message from worker #'.$workerId.'; '.$message;
Console::warning('Total connections: '.count($connections));
foreach($connections as $fd) {
if ($server->exist($fd)
&& $server->isEstablished($fd)
) {
Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
}
else {
$server->close($fd);
}
}
});
$attempts++;
} catch (\Throwable $th) {
$attempts++;
continue;
}
}
2020-10-19 00:51:16 +13:00
2020-10-20 04:09:53 +13:00
Console::error('Failed to restart connection...');
2020-10-19 00:51:16 +13:00
});
2020-10-16 20:31:09 +13:00
$server->on("start", function (Server $server) {
2020-10-19 00:51:16 +13:00
Console::success('Server started succefully');
2020-10-16 20:31:09 +13:00
});
2020-10-20 04:09:53 +13:00
$server->on('open', function(Server $server, Request $request) use (&$connections) {
$connections[] = $request->fd;
Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})");
Console::info('Total connections: '.count($connections));
2020-10-19 00:51:16 +13:00
2020-10-20 04:09:53 +13:00
$server->push($request->fd, json_encode(["hello", count($connections)]));
2020-10-16 20:31:09 +13:00
});
$server->on('message', function(Server $server, Frame $frame) {
2020-10-20 04:09:53 +13:00
if($frame->data === 'reload') {
$server->reload();
}
Console::info('Recieved message: '.$frame->data.' (user: '.$frame->fd.', worker: '.$server->getWorkerId().')');
2020-10-19 00:51:16 +13:00
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
2020-10-16 20:31:09 +13:00
});
$server->on('close', function(Server $server, int $fd) {
2020-10-20 04:09:53 +13:00
Console::error('Connection close: '.$fd);
2020-10-16 20:31:09 +13:00
});
$server->start();