1
0
Fork 0
mirror of synced 2024-06-29 19:50:26 +12:00
appwrite/app/realtime.php

109 lines
3.4 KiB
PHP
Raw Normal View History

2020-10-16 20:31:09 +13:00
<?php
use Swoole\WebSocket\Server;
use Swoole\WebSocket\Frame;
2020-10-19 00:51:16 +13:00
use Utopia\CLI\Console;
2020-10-16 20:31:09 +13:00
/**
* TODO List
*
* - Abuse Control / x mesages per connection
* - CORS Validation
* - Limit payload size
* - Message structure: { status: "ok"|"error", event: EVENT_NAME, data: <any arbitrary data> }
* - JWT Authentication (in path / or in message)
2020-10-17 18:48:03 +13:00
*
*
* - https://github.com/hhxsv5/php-sse
* - https://github.com/shuixn/socket.io-swoole-server
2020-10-16 20:31:09 +13:00
*/
2020-10-19 00:51:16 +13:00
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL | SWOOLE_HOOK_CURL);
2020-10-16 20:31:09 +13:00
$server = new Server("0.0.0.0", 80);
2020-10-19 00:51:16 +13:00
// $redis = new \Swoole\Coroutine\Redis();
// $redis->on('message', function(\Swoole\Coroutine\Redis $redis, $rs) use ($server) {
// var_dump($server);
// echo 'redis got message' . PHP_EOL;
// var_dump($rs);
// $server->send(1, $rs);
// });
// $redis->connect('redis', 6379, function(\Swoole\Coroutine\Redis $redis, $result){
// echo 'connected to redis' . PHP_EOL;
// $redis->subscribe('chat');
// });
$server->on("workerStart", function ($server, $workerId) {
Console::success('Worker '.++$workerId.' started succefully');
$redis = new Redis();
$redis->connect('redis', 6379);
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) {
var_dump($redis, $channel, $message);
$message = 'Message from worker #'.$workerId.'; '.$message;
foreach($server->connections as $fd) {
if ($server->exist($fd) && $server->isEstablished($fd)) {
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
}
else {
$server->close($fd);
}
}
});
});
$server->on('BeforeReload', function($serv, $workerId) {
Console::success('Starting reload...');
});
$server->on('AfterReload', function($serv, $workerId) {
Console::success('Reload completed...');
});
// $process = new Process(function($process) use ($server) {
// while (true) {
// $msg = $process->read();
// foreach($server->connections as $fd) {
// if ($server->exist($fd) && $server->isEstablished($fd)) {
// $server->push($fd, json_encode(['hey there']), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
// }
// }
// sleep(10);
// }
// });
// $server->addProcess($process);
2020-10-16 20:31:09 +13:00
$server->on("start", function (Server $server) {
2020-10-19 00:51:16 +13:00
Console::success('Server started succefully');
2020-10-16 20:31:09 +13:00
});
$server->on('open', function(Server $server, Swoole\Http\Request $request) {
echo "connection open: {$request->fd}\n";
2020-10-17 18:48:03 +13:00
2020-10-19 00:51:16 +13:00
foreach($server->connections as $fd) {
if ($server->exist($fd) && $server->isEstablished($fd)) {
$server->push($fd, json_encode(['hey there', count($server->ports[0]->connections), ]), SWOOLE_WEBSOCKET_OPCODE_TEXT, SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
}
}
2020-10-16 20:31:09 +13:00
$server->push($request->fd, json_encode(["hello", time()]));
});
$server->on('message', function(Server $server, Frame $frame) {
echo "received message: {$frame->data}\n";
2020-10-19 00:51:16 +13:00
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
2020-10-16 20:31:09 +13:00
});
$server->on('close', function(Server $server, int $fd) {
echo "connection close: {$fd}\n";
});
$server->start();