introduce utopia-php/websocket
This commit is contained in:
parent
ad4ecea673
commit
923d373b6d
|
@ -190,7 +190,6 @@ $register->set('redisPool', function () {
|
|||
->withPort($redisPort)
|
||||
->withAuth($redisAuth)
|
||||
->withDbIndex(0)
|
||||
->withTimeout(1)
|
||||
);
|
||||
|
||||
return $pool;
|
||||
|
|
|
@ -32,6 +32,7 @@ foreach ([
|
|||
realpath(__DIR__ . '/../vendor/psr/log'),
|
||||
realpath(__DIR__ . '/../vendor/matomo'),
|
||||
realpath(__DIR__ . '/../vendor/symfony'),
|
||||
realpath(__DIR__ . '/../vendor/utopia-php/websocket'), // TODO: remove workerman autoload
|
||||
] as $key => $value) {
|
||||
if($value !== false) {
|
||||
$preloader->ignore($value);
|
||||
|
|
330
app/realtime.php
330
app/realtime.php
|
@ -1,14 +1,332 @@
|
|||
<?php
|
||||
|
||||
use Appwrite\Realtime\Server;
|
||||
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
||||
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
||||
use Appwrite\Database\Database;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Network\Validator\Origin;
|
||||
use Appwrite\Realtime\Parser;
|
||||
use Swoole\Http\Request as SwooleRequest;
|
||||
use Swoole\Http\Response as SwooleResponse;
|
||||
use Swoole\Process;
|
||||
use Swoole\Runtime;
|
||||
use Swoole\Table;
|
||||
use Swoole\Timer;
|
||||
use Swoole\WebSocket\Frame;
|
||||
use Swoole\WebSocket\Server as SwooleServer;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Swoole\Request;
|
||||
use Utopia\Swoole\Response;
|
||||
use Utopia\WebSocket\Server;
|
||||
use Utopia\WebSocket\Adapter;
|
||||
|
||||
require_once __DIR__ . '/init.php';
|
||||
|
||||
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
|
||||
$config = [
|
||||
'package_max_length' => 64000 // Default maximum Package Size (64kb)
|
||||
];
|
||||
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
|
||||
$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb)
|
||||
|
||||
$realtimeServer = new Server($register, port: App::getEnv('PORT', 80), config: $config);
|
||||
$subscriptions = [];
|
||||
$connections = [];
|
||||
|
||||
$stats = new Table(4096, 1);
|
||||
$stats->column('projectId', Table::TYPE_STRING, 64);
|
||||
$stats->column('connections', Table::TYPE_INT);
|
||||
$stats->column('connectionsTotal', Table::TYPE_INT);
|
||||
$stats->column('messages', Table::TYPE_INT);
|
||||
$stats->create();
|
||||
|
||||
$server = new Server($adapter);
|
||||
|
||||
$server->onStart(function(SwooleServer $server) use ($stats) {
|
||||
Console::success('Server started succefully');
|
||||
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
|
||||
|
||||
Timer::tick(10000, function () use ($stats) {
|
||||
foreach ($stats as $projectId => $value) {
|
||||
if (empty($value['connections']) && empty($value['messages'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$connections = $value['connections'];
|
||||
$messages = $value['messages'];
|
||||
|
||||
$usage = new Event('v1-usage', 'UsageV1');
|
||||
$usage
|
||||
->setParam('projectId', $projectId)
|
||||
->setParam('realtimeConnections', $connections)
|
||||
->setParam('realtimeMessages', $messages)
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
$stats->set($projectId, [
|
||||
'projectId' => $projectId,
|
||||
'messages' => 0,
|
||||
'connections' => 0
|
||||
]);
|
||||
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Process::signal(2, function () use ($server) {
|
||||
Console::log('Stop by Ctrl+C');
|
||||
$server->shutdown();
|
||||
});
|
||||
});
|
||||
|
||||
$server->onWorkerStart(function(SwooleServer $swooleServer, int $workerId) use ($server, $register, $stats, &$subscriptions, &$connections) {
|
||||
Console::success('Worker ' . $workerId . ' started succefully');
|
||||
|
||||
$attempts = 0;
|
||||
$start = time();
|
||||
$redisPool = $register->get('redisPool');
|
||||
|
||||
/**
|
||||
* Sending current connections to project channels on the console project every 5 seconds.
|
||||
*/
|
||||
Timer::tick(5000, function () use ($server, $stats, &$subscriptions) {
|
||||
if (
|
||||
array_key_exists('console', $subscriptions)
|
||||
&& array_key_exists('role:member', $subscriptions['console'])
|
||||
&& array_key_exists('project', $subscriptions['console']['role:member'])
|
||||
) {
|
||||
$payload = [];
|
||||
foreach ($stats as $projectId => $value) {
|
||||
$payload[$projectId] = $value['connectionsTotal'];
|
||||
}
|
||||
$server->send(array_keys($subscriptions['console']['role:member']['project']), json_encode([
|
||||
'event' => 'stats.connections',
|
||||
'channels' => ['project'],
|
||||
'timestamp' => time(),
|
||||
'payload' => $payload
|
||||
]));
|
||||
}
|
||||
});
|
||||
|
||||
while ($attempts < 300) {
|
||||
try {
|
||||
if ($attempts > 0) {
|
||||
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
|
||||
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
|
||||
sleep(5); // 5 sec delay between connection attempts
|
||||
}
|
||||
$start = time();
|
||||
|
||||
/** @var Redis $redis */
|
||||
$redis = $redisPool->get();
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
if ($redis->ping(true)) {
|
||||
$attempts = 0;
|
||||
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
|
||||
} else {
|
||||
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
|
||||
}
|
||||
|
||||
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, &$connections, &$subscriptions) {
|
||||
$event = json_decode($payload, true);
|
||||
|
||||
if ($event['permissionsChanged'] && isset($event['userId'])) {
|
||||
$project = $event['project'];
|
||||
$userId = $event['userId'];
|
||||
|
||||
if (array_key_exists($project, $subscriptions) && array_key_exists('user:'.$userId, $subscriptions[$project])) {
|
||||
$connection = array_key_first(reset($subscriptions[$project]['user:'.$userId]));
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is redundant soon and will be gone with merging the usage branch.
|
||||
*/
|
||||
$db = $register->get('dbPool')->get();
|
||||
$cache = $register->get('redisPool')->get();
|
||||
|
||||
$projectDB = new Database();
|
||||
$projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||
$projectDB->setNamespace('app_'.$project);
|
||||
$projectDB->setMocks(Config::getParam('collections', []));
|
||||
|
||||
$user = $projectDB->getDocument($userId);
|
||||
|
||||
Parser::setUser($user);
|
||||
|
||||
$roles = Parser::getRoles();
|
||||
|
||||
Parser::subscribe($project, $connection, $roles, $subscriptions, $connections, $connections[$connection]['channels']);
|
||||
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($cache);
|
||||
}
|
||||
|
||||
$receivers = Parser::identifyReceivers($event, $subscriptions);
|
||||
|
||||
// Temporarily print debug logs by default for Alpha testing.
|
||||
// if (App::isDevelopment() && !empty($receivers)) {
|
||||
if (!empty($receivers)) {
|
||||
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
|
||||
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
|
||||
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
|
||||
}
|
||||
|
||||
$server->send(
|
||||
$receivers,
|
||||
json_encode($event['data'])
|
||||
);
|
||||
|
||||
if (($num = count($receivers)) > 0) {
|
||||
$stats->incr($event['project'], 'messages', $num);
|
||||
}
|
||||
});
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||
$redisPool->put($redis);
|
||||
$attempts++;
|
||||
continue;
|
||||
}
|
||||
|
||||
$attempts++;
|
||||
}
|
||||
|
||||
Console::error('Failed to restart pub/sub...');
|
||||
});
|
||||
|
||||
$server->onOpen(function(SwooleServer $swooleServer, SwooleRequest $request) use ($server, $register, $stats, &$subscriptions, &$connections) {
|
||||
$app = new App('UTC');
|
||||
$connection = $request->fd;
|
||||
$request = new Request($request);
|
||||
|
||||
/** @var PDO $db */
|
||||
$db = $register->get('dbPool')->get();
|
||||
/** @var Redis $redis */
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
Console::info("Connection open (user: {$connection}, worker: {$swooleServer->getWorkerId()})");
|
||||
|
||||
App::setResource('db', function () use (&$db) {
|
||||
return $db;
|
||||
});
|
||||
|
||||
App::setResource('cache', function () use (&$redis) {
|
||||
return $redis;
|
||||
});
|
||||
|
||||
App::setResource('request', function () use ($request) {
|
||||
return $request;
|
||||
});
|
||||
|
||||
App::setResource('response', function () {
|
||||
return new Response(new SwooleResponse());
|
||||
});
|
||||
|
||||
try {
|
||||
/** @var \Appwrite\Database\Document $user */
|
||||
$user = $app->getResource('user');
|
||||
|
||||
/** @var \Appwrite\Database\Document $project */
|
||||
$project = $app->getResource('project');
|
||||
|
||||
/** @var \Appwrite\Database\Document $console */
|
||||
$console = $app->getResource('console');
|
||||
|
||||
/*
|
||||
* Project Check
|
||||
*/
|
||||
if (empty($project->getId())) {
|
||||
throw new Exception('Missing or unknown project ID', 1008);
|
||||
}
|
||||
|
||||
/*
|
||||
* Abuse Check
|
||||
*
|
||||
* Abuse limits are connecting 128 times per minute and ip address.
|
||||
*/
|
||||
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use (&$db) {
|
||||
return $db;
|
||||
});
|
||||
$timeLimit
|
||||
->setNamespace('app_' . $project->getId())
|
||||
->setParam('{ip}', $request->getIP())
|
||||
->setParam('{url}', $request->getURI());
|
||||
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
throw new Exception('Too many requests', 1013);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate Client Domain - Check to avoid CSRF attack.
|
||||
* Adding Appwrite API domains to allow XDOMAIN communication.
|
||||
* Skip this check for non-web platforms which are not required to send an origin header.
|
||||
*/
|
||||
$origin = $request->getOrigin();
|
||||
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
|
||||
|
||||
if (!$originValidator->isValid($origin) && $project->getId() !== 'console') {
|
||||
throw new Exception($originValidator->getDescription(), 1008);
|
||||
}
|
||||
|
||||
Parser::setUser($user);
|
||||
|
||||
$roles = Parser::getRoles();
|
||||
$channels = Parser::parseChannels($request->getQuery('channels', []));
|
||||
|
||||
/**
|
||||
* Channels Check
|
||||
*/
|
||||
if (empty($channels)) {
|
||||
throw new Exception('Missing channels', 1008);
|
||||
}
|
||||
|
||||
Parser::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
|
||||
|
||||
$server->send([$connection], json_encode($channels));
|
||||
|
||||
$stats->incr($project->getId(), 'connections');
|
||||
$stats->incr($project->getId(), 'connectionsTotal');
|
||||
} catch (\Throwable $th) {
|
||||
$response = [
|
||||
'code' => $th->getCode(),
|
||||
'message' => $th->getMessage()
|
||||
];
|
||||
// Temporarily print debug logs by default for Alpha testing.
|
||||
//if (App::isDevelopment()) {
|
||||
Console::error("[Error] Connection Error");
|
||||
Console::error("[Error] Code: " . $response['code']);
|
||||
Console::error("[Error] Message: " . $response['message']);
|
||||
//}
|
||||
$server->send([$connection], json_encode($response));
|
||||
$server->close($connection, $th->getCode());
|
||||
} finally {
|
||||
/**
|
||||
* Put used PDO and Redis Connections back into their pools.
|
||||
*/
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
});
|
||||
|
||||
$server->onMessage(function(SwooleServer $swooleServer, Frame $frame) use ($server) {
|
||||
$connection = $frame->fd;
|
||||
$server->send([$connection], 'Sending messages is not allowed.');
|
||||
$server->close($connection, 1003);
|
||||
});
|
||||
|
||||
$server->onClose(function(SwooleServer $server, int $connection) use (&$connections, &$subscriptions, $stats) {
|
||||
if (array_key_exists($connection, $connections)) {
|
||||
$stats->decr($connections[$connection]['projectId'], 'connectionsTotal');
|
||||
}
|
||||
Parser::unsubscribe($connection, $subscriptions, $connections);
|
||||
Console::info('Connection close: ' . $connection);
|
||||
});
|
||||
|
||||
$server->start();
|
|
@ -52,6 +52,7 @@
|
|||
"utopia-php/swoole": "0.2.*",
|
||||
"utopia-php/storage": "0.5.*",
|
||||
"utopia-php/image": "0.3.*",
|
||||
"utopia-php/websocket": "dev-main",
|
||||
"resque/php-resque": "1.3.6",
|
||||
"matomo/device-detector": "4.2.2",
|
||||
"dragonmantank/cron-expression": "3.1.0",
|
||||
|
@ -68,6 +69,12 @@
|
|||
"phpunit/phpunit": "9.5.4",
|
||||
"vimeo/psalm": "4.7.2"
|
||||
},
|
||||
"repositories": [
|
||||
{
|
||||
"type": "vcs",
|
||||
"url": "https://github.com/utopia-php/websocket"
|
||||
}
|
||||
],
|
||||
"provide": {
|
||||
"ext-phpiredis": "*"
|
||||
},
|
||||
|
|
77
composer.lock
generated
77
composer.lock
generated
|
@ -4,7 +4,7 @@
|
|||
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
|
||||
"This file is @generated automatically"
|
||||
],
|
||||
"content-hash": "ba882fb2e25e5d4304a6574986a6a7e9",
|
||||
"content-hash": "3cf401f31fafb8aa517415681d53c516",
|
||||
"packages": [
|
||||
{
|
||||
"name": "adhocore/jwt",
|
||||
|
@ -115,16 +115,16 @@
|
|||
},
|
||||
{
|
||||
"name": "appwrite/php-runtimes",
|
||||
"version": "dev-main",
|
||||
"version": "0.3.0",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/appwrite/php-runtimes.git",
|
||||
"reference": "cc7090a67d8824c779190b38873f0f8154f906b2"
|
||||
"reference": "39be003cdff22c8447de151921001eb5d3bf2319"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/appwrite/php-runtimes/zipball/cc7090a67d8824c779190b38873f0f8154f906b2",
|
||||
"reference": "cc7090a67d8824c779190b38873f0f8154f906b2",
|
||||
"url": "https://api.github.com/repos/appwrite/php-runtimes/zipball/39be003cdff22c8447de151921001eb5d3bf2319",
|
||||
"reference": "39be003cdff22c8447de151921001eb5d3bf2319",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
|
@ -136,7 +136,6 @@
|
|||
"utopia-php/cli": "0.11.*",
|
||||
"vimeo/psalm": "4.0.1"
|
||||
},
|
||||
"default-branch": true,
|
||||
"type": "library",
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
|
@ -145,7 +144,7 @@
|
|||
},
|
||||
"notification-url": "https://packagist.org/downloads/",
|
||||
"license": [
|
||||
"BSD-3-Clause"
|
||||
"BSD-3"
|
||||
],
|
||||
"authors": [
|
||||
{
|
||||
|
@ -165,9 +164,9 @@
|
|||
],
|
||||
"support": {
|
||||
"issues": "https://github.com/appwrite/php-runtimes/issues",
|
||||
"source": "https://github.com/appwrite/php-runtimes/tree/main"
|
||||
"source": "https://github.com/appwrite/php-runtimes/tree/0.3.0"
|
||||
},
|
||||
"time": "2021-06-23T07:17:12+00:00"
|
||||
"time": "2021-06-15T07:52:43+00:00"
|
||||
},
|
||||
{
|
||||
"name": "chillerlan/php-qrcode",
|
||||
|
@ -2113,6 +2112,64 @@
|
|||
},
|
||||
"time": "2021-02-04T14:14:49+00:00"
|
||||
},
|
||||
{
|
||||
"name": "utopia-php/websocket",
|
||||
"version": "dev-main",
|
||||
"source": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/utopia-php/websocket.git",
|
||||
"reference": "d08b0b1b29b7dea3c62d2ed3aab6ac872c382e76"
|
||||
},
|
||||
"dist": {
|
||||
"type": "zip",
|
||||
"url": "https://api.github.com/repos/utopia-php/websocket/zipball/d08b0b1b29b7dea3c62d2ed3aab6ac872c382e76",
|
||||
"reference": "d08b0b1b29b7dea3c62d2ed3aab6ac872c382e76",
|
||||
"shasum": ""
|
||||
},
|
||||
"require": {
|
||||
"php": ">=8.0"
|
||||
},
|
||||
"require-dev": {
|
||||
"phpunit/phpunit": "^9.5.5",
|
||||
"swoole/ide-helper": "4.6.6",
|
||||
"textalk/websocket": "1.5.2",
|
||||
"vimeo/psalm": "^4.8.1",
|
||||
"workerman/workerman": "^4.0"
|
||||
},
|
||||
"default-branch": true,
|
||||
"type": "library",
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Utopia\\WebSocket\\": "src/WebSocket"
|
||||
}
|
||||
},
|
||||
"license": [
|
||||
"MIT"
|
||||
],
|
||||
"authors": [
|
||||
{
|
||||
"name": "Eldad Fux",
|
||||
"email": "eldad@appwrite.io"
|
||||
},
|
||||
{
|
||||
"name": "Torsten Dittmann",
|
||||
"email": "torsten@appwrite.io"
|
||||
}
|
||||
],
|
||||
"description": "A simple abstraction for WebSocket servers.",
|
||||
"keywords": [
|
||||
"framework",
|
||||
"php",
|
||||
"upf",
|
||||
"utopia",
|
||||
"websocket"
|
||||
],
|
||||
"support": {
|
||||
"source": "https://github.com/utopia-php/websocket/tree/main",
|
||||
"issues": "https://github.com/utopia-php/websocket/issues"
|
||||
},
|
||||
"time": "2021-06-24T10:54:56+00:00"
|
||||
},
|
||||
{
|
||||
"name": "webmozart/assert",
|
||||
"version": "1.10.0",
|
||||
|
@ -6054,7 +6111,7 @@
|
|||
"aliases": [],
|
||||
"minimum-stability": "stable",
|
||||
"stability-flags": {
|
||||
"appwrite/php-runtimes": 20
|
||||
"utopia-php/websocket": 20
|
||||
},
|
||||
"prefer-stable": false,
|
||||
"prefer-lowest": false,
|
||||
|
|
|
@ -527,15 +527,15 @@ services:
|
|||
networks:
|
||||
- appwrite
|
||||
|
||||
# redis-commander:
|
||||
# image: rediscommander/redis-commander:latest
|
||||
# restart: unless-stopped
|
||||
# networks:
|
||||
# - appwrite
|
||||
# environment:
|
||||
# - REDIS_HOSTS=redis
|
||||
# ports:
|
||||
# - "8081:8081"
|
||||
redis-commander:
|
||||
image: rediscommander/redis-commander:latest
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- appwrite
|
||||
environment:
|
||||
- REDIS_HOSTS=redis
|
||||
ports:
|
||||
- "8081:8081"
|
||||
|
||||
# resque:
|
||||
# image: appwrite/resque-web:1.1.0
|
||||
|
|
|
@ -1,423 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Realtime;
|
||||
|
||||
use Appwrite\Database\Database;
|
||||
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
||||
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Network\Validator\Origin;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Exception;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Http\Response as SwooleResponse;
|
||||
use Swoole\Process;
|
||||
use Swoole\Table;
|
||||
use Swoole\Timer;
|
||||
use Swoole\WebSocket\Frame;
|
||||
use Swoole\WebSocket\Server as SwooleServer;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\TimeLimit;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Exception as UtopiaException;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Swoole\Request as SwooleRequest;
|
||||
|
||||
|
||||
class Server
|
||||
{
|
||||
private Registry $register;
|
||||
private SwooleServer $server;
|
||||
private Table $stats;
|
||||
private array $subscriptions;
|
||||
private array $connections;
|
||||
|
||||
public function __construct(Registry &$register, $host = '0.0.0.0', $port = 80, $config = [])
|
||||
{
|
||||
$this->subscriptions = [];
|
||||
$this->connections = [];
|
||||
$this->register = $register;
|
||||
|
||||
$this->stats = new Table(4096, 1);
|
||||
$this->stats->column('projectId', Table::TYPE_STRING, 64);
|
||||
$this->stats->column('connections', Table::TYPE_INT);
|
||||
$this->stats->column('connectionsTotal', Table::TYPE_INT);
|
||||
$this->stats->column('messages', Table::TYPE_INT);
|
||||
$this->stats->create();
|
||||
|
||||
$this->server = new SwooleServer($host, $port, SWOOLE_PROCESS);
|
||||
$this->server->set($config);
|
||||
$this->server->on('start', [$this, 'onStart']);
|
||||
$this->server->on('workerStart', [$this, 'onWorkerStart']);
|
||||
$this->server->on('open', [$this, 'onOpen']);
|
||||
$this->server->on('message', [$this, 'onMessage']);
|
||||
$this->server->on('close', [$this, 'onClose']);
|
||||
$this->server->start();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when the Realtime server starts.
|
||||
* @param SwooleServer $server
|
||||
* @return void
|
||||
*/
|
||||
public function onStart(SwooleServer $server): void
|
||||
{
|
||||
Console::success('Server started succefully');
|
||||
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
|
||||
|
||||
Timer::tick(10000, function () {
|
||||
/** @var Table $stats */
|
||||
foreach ($this->stats as $projectId => $value) {
|
||||
if (empty($value['connections']) && empty($value['messages'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$connections = $value['connections'];
|
||||
$messages = $value['messages'];
|
||||
|
||||
$usage = new Event('v1-usage', 'UsageV1');
|
||||
$usage
|
||||
->setParam('projectId', $projectId)
|
||||
->setParam('realtimeConnections', $connections)
|
||||
->setParam('realtimeMessages', $messages)
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
$this->stats->set($projectId, [
|
||||
'projectId' => $projectId,
|
||||
'messages' => 0,
|
||||
'connections' => 0
|
||||
]);
|
||||
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Process::signal(2, function () use ($server) {
|
||||
Console::log('Stop by Ctrl+C');
|
||||
$server->shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when a WebSocket worker process starts.
|
||||
* @param SwooleServer $server
|
||||
* @param int $workerId
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function onWorkerStart(SwooleServer $server, int $workerId): void
|
||||
{
|
||||
Console::success('Worker ' . $workerId . ' started succefully');
|
||||
|
||||
$attempts = 0;
|
||||
$start = time();
|
||||
$redisPool = $this->register->get('redisPool');
|
||||
|
||||
/**
|
||||
* Sending current connections to project channels on the console project every 5 seconds.
|
||||
*/
|
||||
$server->tick(5000, function () use (&$server) {
|
||||
$this->tickSendProjectUsage($server);
|
||||
});
|
||||
|
||||
while ($attempts < 300) {
|
||||
try {
|
||||
if ($attempts > 0) {
|
||||
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
|
||||
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
|
||||
sleep(5); // 5 sec delay between connection attempts
|
||||
}
|
||||
|
||||
/** @var Swoole\Coroutine\Redis $redis */
|
||||
$redis = $redisPool->get();
|
||||
|
||||
if ($redis->ping(true)) {
|
||||
$attempts = 0;
|
||||
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
|
||||
} else {
|
||||
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
|
||||
}
|
||||
|
||||
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId) {
|
||||
$this->onRedisPublish($payload, $server, $workerId);
|
||||
});
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||
$redisPool->put($redis);
|
||||
$attempts++;
|
||||
continue;
|
||||
}
|
||||
|
||||
$attempts++;
|
||||
}
|
||||
|
||||
Console::error('Failed to restart pub/sub...');
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when a new Realtime connection is established.
|
||||
* @param SwooleServer $server
|
||||
* @param Request $request
|
||||
* @return void
|
||||
* @throws Exception
|
||||
* @throws UtopiaException
|
||||
*/
|
||||
public function onOpen(SwooleServer $server, Request $request): void
|
||||
{
|
||||
$app = new App('UTC');
|
||||
$connection = $request->fd;
|
||||
$request = new SwooleRequest($request);
|
||||
|
||||
$db = $this->register->get('dbPool')->get();
|
||||
$redis = $this->register->get('redisPool')->get();
|
||||
|
||||
Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})");
|
||||
|
||||
App::setResource('db', function () use (&$db) {
|
||||
return $db;
|
||||
});
|
||||
|
||||
App::setResource('cache', function () use (&$redis) {
|
||||
return $redis;
|
||||
});
|
||||
|
||||
App::setResource('request', function () use ($request) {
|
||||
return $request;
|
||||
});
|
||||
|
||||
App::setResource('response', function () {
|
||||
return new Response(new SwooleResponse());
|
||||
});
|
||||
|
||||
try {
|
||||
/** @var \Appwrite\Database\Document $user */
|
||||
$user = $app->getResource('user');
|
||||
|
||||
/** @var \Appwrite\Database\Document $project */
|
||||
$project = $app->getResource('project');
|
||||
|
||||
/** @var \Appwrite\Database\Document $console */
|
||||
$console = $app->getResource('console');
|
||||
|
||||
/*
|
||||
* Project Check
|
||||
*/
|
||||
if (empty($project->getId())) {
|
||||
throw new Exception('Missing or unknown project ID', 1008);
|
||||
}
|
||||
|
||||
/*
|
||||
* Abuse Check
|
||||
*
|
||||
* Abuse limits are connecting 128 times per minute and ip address.
|
||||
*/
|
||||
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use (&$db) {
|
||||
return $db;
|
||||
});
|
||||
$timeLimit
|
||||
->setNamespace('app_' . $project->getId())
|
||||
->setParam('{ip}', $request->getIP())
|
||||
->setParam('{url}', $request->getURI());
|
||||
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
throw new Exception('Too many requests', 1013);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate Client Domain - Check to avoid CSRF attack.
|
||||
* Adding Appwrite API domains to allow XDOMAIN communication.
|
||||
* Skip this check for non-web platforms which are not required to send an origin header.
|
||||
*/
|
||||
$origin = $request->getOrigin();
|
||||
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
|
||||
|
||||
if (!$originValidator->isValid($origin) && $project->getId() !== 'console') {
|
||||
throw new Exception($originValidator->getDescription(), 1008);
|
||||
}
|
||||
|
||||
Parser::setUser($user);
|
||||
|
||||
$roles = Parser::getRoles();
|
||||
$channels = Parser::parseChannels($request->getQuery('channels', []));
|
||||
|
||||
/**
|
||||
* Channels Check
|
||||
*/
|
||||
if (empty($channels)) {
|
||||
throw new Exception('Missing channels', 1008);
|
||||
}
|
||||
|
||||
Parser::subscribe($project->getId(), $connection, $roles, $this->subscriptions, $this->connections, $channels);
|
||||
|
||||
$server->push($connection, json_encode($channels));
|
||||
|
||||
$this->stats->incr($project->getId(), 'connections');
|
||||
$this->stats->incr($project->getId(), 'connectionsTotal');
|
||||
} catch (\Throwable $th) {
|
||||
$response = [
|
||||
'code' => $th->getCode(),
|
||||
'message' => $th->getMessage()
|
||||
];
|
||||
// Temporarily print debug logs by default for Alpha testing.
|
||||
//if (App::isDevelopment()) {
|
||||
Console::error("[Error] Connection Error");
|
||||
Console::error("[Error] Code: " . $response['code']);
|
||||
Console::error("[Error] Message: " . $response['message']);
|
||||
//}
|
||||
$server->push($connection, json_encode($response));
|
||||
$server->close($connection);
|
||||
}
|
||||
/**
|
||||
* Put used PDO and Redis Connections back into their pools.
|
||||
*/
|
||||
/** @var PDOPool $dbPool */
|
||||
$dbPool = $this->register->get('dbPool');
|
||||
$dbPool->put($db);
|
||||
|
||||
/** @var RedisPool $redisPool */
|
||||
$redisPool = $this->register->get('redisPool');
|
||||
$redisPool->put($redis);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when a message is received by the Realtime server.
|
||||
* @param SwooleServer $server
|
||||
* @param Frame $frame
|
||||
* @return void
|
||||
*/
|
||||
public function onMessage(SwooleServer $server, Frame $frame)
|
||||
{
|
||||
$server->push($frame->fd, 'Sending messages is not allowed.');
|
||||
$server->close($frame->fd);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when a Realtime connection is closed.
|
||||
* @param SwooleServer $server
|
||||
* @param int $connection
|
||||
* @return void
|
||||
*/
|
||||
public function onClose(SwooleServer $server, int $connection)
|
||||
{
|
||||
if (array_key_exists($connection, $this->connections)) {
|
||||
$this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal');
|
||||
}
|
||||
Parser::unsubscribe($connection, $this->subscriptions, $this->connections);
|
||||
Console::info('Connection close: ' . $connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is executed when an event is published on realtime channel in Redis.
|
||||
* @param string $payload
|
||||
* @param SwooleServer $server
|
||||
* @param int $workerId
|
||||
* @return void
|
||||
*/
|
||||
public function onRedisPublish(string $payload, SwooleServer &$server, int $workerId)
|
||||
{
|
||||
$event = json_decode($payload, true);
|
||||
|
||||
if ($event['permissionsChanged'] && isset($event['userId'])) {
|
||||
$this->addPermission($event);
|
||||
}
|
||||
|
||||
$receivers = Parser::identifyReceivers($event, $this->subscriptions);
|
||||
|
||||
// Temporarily print debug logs by default for Alpha testing.
|
||||
// if (App::isDevelopment() && !empty($receivers)) {
|
||||
if (!empty($receivers)) {
|
||||
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
|
||||
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
|
||||
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
|
||||
}
|
||||
|
||||
foreach ($receivers as $receiver) {
|
||||
if ($server->exist($receiver) && $server->isEstablished($receiver)) {
|
||||
$server->push(
|
||||
$receiver,
|
||||
json_encode($event['data']),
|
||||
SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
|
||||
);
|
||||
} else {
|
||||
$server->close($receiver);
|
||||
}
|
||||
}
|
||||
if (($num = count($receivers)) > 0) {
|
||||
$this->stats->incr($event['project'], 'messages', $num);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This sends the usage to the `console` channel.
|
||||
* @param SwooleServer $server
|
||||
* @return void
|
||||
*/
|
||||
public function tickSendProjectUsage(SwooleServer &$server)
|
||||
{
|
||||
if (
|
||||
array_key_exists('console', $this->subscriptions)
|
||||
&& array_key_exists('role:member', $this->subscriptions['console'])
|
||||
&& array_key_exists('project', $this->subscriptions['console']['role:member'])
|
||||
) {
|
||||
$payload = [];
|
||||
foreach ($this->stats as $projectId => $value) {
|
||||
$payload[$projectId] = $value['connectionsTotal'];
|
||||
}
|
||||
foreach ($this->subscriptions['console']['role:member']['project'] as $connection => $value) {
|
||||
$server->push(
|
||||
$connection,
|
||||
json_encode([
|
||||
'event' => 'stats.connections',
|
||||
'channels' => ['project'],
|
||||
'timestamp' => time(),
|
||||
'payload' => $payload
|
||||
]),
|
||||
SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function addPermission(array $event)
|
||||
{
|
||||
$project = $event['project'];
|
||||
$userId = $event['userId'];
|
||||
|
||||
if (array_key_exists($project, $this->subscriptions) && array_key_exists('user:'.$userId, $this->subscriptions[$project])) {
|
||||
$connection = array_key_first(reset($this->subscriptions[$project]['user:'.$userId]));
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is redundant soon and will be gone with merging the usage branch.
|
||||
*/
|
||||
$db = $this->register->get('dbPool')->get();
|
||||
$cache = $this->register->get('redisPool')->get();
|
||||
|
||||
$projectDB = new Database();
|
||||
$projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||
$projectDB->setNamespace('app_'.$project);
|
||||
$projectDB->setMocks(Config::getParam('collections', []));
|
||||
|
||||
$user = $projectDB->getDocument($userId);
|
||||
|
||||
Parser::setUser($user);
|
||||
|
||||
$roles = Parser::getRoles();
|
||||
|
||||
Parser::subscribe($project, $connection, $roles, $this->subscriptions, $this->connections, $this->connections[$connection]['channels']);
|
||||
|
||||
$this->register->get('dbPool')->put($db);
|
||||
$this->register->get('redisPool')->put($cache);
|
||||
}
|
||||
}
|
|
@ -8,11 +8,11 @@ export let options = {
|
|||
stages: [
|
||||
{
|
||||
duration: '10s',
|
||||
target: 10
|
||||
target: 500
|
||||
},
|
||||
{
|
||||
duration: '30m',
|
||||
target: 10
|
||||
duration: '1m',
|
||||
target: 500
|
||||
},
|
||||
],
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ trait RealtimeBase
|
|||
];
|
||||
return new WebSocketClient('ws://appwrite-traefik/v1/realtime?' . http_build_query($query), [
|
||||
'headers' => $headers,
|
||||
'timeout' => 60,
|
||||
'timeout' => 30,
|
||||
]);
|
||||
}
|
||||
|
||||
|
@ -622,7 +622,7 @@ trait RealtimeBase
|
|||
'x-appwrite-key' => $this->getProject()['apiKey']
|
||||
]), [
|
||||
'name' => 'Test',
|
||||
'env' => 'php-7.4',
|
||||
'runtime' => 'php-7.4',
|
||||
'execute' => ['*'],
|
||||
'timeout' => 10,
|
||||
]);
|
||||
|
|
Loading…
Reference in a new issue