1
0
Fork 0
mirror of synced 2024-06-29 19:50:26 +12:00
appwrite/app/realtime.php

230 lines
7.2 KiB
PHP
Raw Normal View History

2020-10-16 20:31:09 +13:00
<?php
2021-02-25 06:12:38 +13:00
require_once __DIR__ . '/init.php';
2020-10-20 04:09:53 +13:00
2021-03-02 05:02:01 +13:00
use Appwrite\Network\Validator\Origin;
use Appwrite\Realtime\Realtime;
2021-03-10 05:07:13 +13:00
use Appwrite\Utopia\Response;
use Swoole\Database\PDOProxy;
2021-03-10 20:43:10 +13:00
use Swoole\Process;
2020-10-20 04:09:53 +13:00
use Swoole\Http\Request;
2021-03-10 05:07:13 +13:00
use Swoole\Http\Response as SwooleResponse;
2020-10-16 20:31:09 +13:00
use Swoole\WebSocket\Frame;
2021-03-10 20:43:10 +13:00
use Swoole\WebSocket\Server;
2020-10-21 02:22:46 +13:00
use Utopia\App;
2020-10-19 00:51:16 +13:00
use Utopia\CLI\Console;
2021-02-22 10:22:32 +13:00
use Utopia\Swoole\Request as SwooleRequest;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
2021-02-25 06:12:38 +13:00
2020-10-16 20:31:09 +13:00
/**
* TODO List
*
* - JWT Authentication (in path / or in message)
2020-10-17 18:48:03 +13:00
*
2020-10-21 02:22:46 +13:00
* Protocols Support:
* - Websocket support: https://www.swoole.co.uk/docs/modules/swoole-websocket-server
2020-10-16 20:31:09 +13:00
*/
2020-10-20 04:09:53 +13:00
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
2020-10-19 00:51:16 +13:00
2021-03-11 00:26:38 +13:00
$register->set('db', function () use ($register) {
$pool = $register->get('dbPool');
$pdo = $pool->get()->__getObject();
return $pdo;
}, true);
$register->set('cache', function () use ($register) { // Register cache connection
$redis = $register->get('redisPool')->get();
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
return $redis;
}, true);
2021-02-26 22:21:07 +13:00
$server = new Server('0.0.0.0', 80);
2021-02-25 06:12:38 +13:00
$server->set([
2021-03-02 04:13:10 +13:00
'package_max_length' => 64000 // Default maximum Package Size (64kb)
2021-02-25 23:43:39 +13:00
]);
2021-02-26 22:21:07 +13:00
2020-10-21 23:50:11 +13:00
$subscriptions = [];
2020-10-22 01:03:50 +13:00
$connections = [];
2020-10-19 00:51:16 +13:00
2021-02-26 22:21:07 +13:00
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
2021-03-10 06:27:48 +13:00
Console::success('Worker ' . $workerId . ' started succefully');
2021-03-10 05:07:13 +13:00
2020-10-20 04:09:53 +13:00
$attempts = 0;
$start = time();
2021-02-25 06:12:38 +13:00
2020-10-20 09:38:49 +13:00
while ($attempts < 300) {
2020-10-20 04:09:53 +13:00
try {
2021-02-25 06:12:38 +13:00
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
2021-02-26 22:21:07 +13:00
sleep(5); // 5 sec delay between connection attempts
2020-10-20 09:38:49 +13:00
}
2021-03-10 19:53:49 +13:00
$redis = $register->get('cache');
2020-10-20 04:09:53 +13:00
2021-02-25 06:12:38 +13:00
if ($redis->ping(true)) {
2020-10-20 04:09:53 +13:00
$attempts = 0;
2021-02-25 06:12:38 +13:00
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
2020-10-20 07:56:02 +13:00
}
2021-02-22 10:22:32 +13:00
2021-02-25 23:43:39 +13:00
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, &$connections, &$subscriptions) {
2021-02-22 10:22:32 +13:00
/**
* Supported Resources:
* - Collection
* - Document
* - File
2021-03-04 22:28:24 +13:00
* - Account
* - Session
* - Team? (not implemented yet)
* - Membership? (not implemented yet)
* - Function? (not available yet)
* - Execution? (not available yet)
2021-02-22 10:22:32 +13:00
*/
2021-02-25 23:43:39 +13:00
$event = json_decode($payload, true);
2021-02-25 06:12:38 +13:00
$receivers = Realtime::identifyReceivers($event, $subscriptions);
2021-02-25 06:12:38 +13:00
foreach ($receivers as $receiver) {
2021-02-25 23:43:39 +13:00
if ($server->exist($receiver) && $server->isEstablished($receiver)) {
$server->push(
$receiver,
json_encode($event['data']),
SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
);
} else {
2021-02-25 06:12:38 +13:00
$server->close($receiver);
2021-02-22 10:22:32 +13:00
}
}
2020-10-20 04:09:53 +13:00
});
} catch (\Throwable $th) {
2021-02-25 06:12:38 +13:00
Console::error('Pub/sub error: ' . $th->getMessage());
2020-10-20 04:09:53 +13:00
$attempts++;
continue;
}
2020-10-20 09:38:49 +13:00
$attempts++;
2020-10-20 04:09:53 +13:00
}
2020-10-19 00:51:16 +13:00
2020-10-22 01:03:50 +13:00
Console::error('Failed to restart pub/sub...');
2020-10-19 00:51:16 +13:00
});
2021-02-26 22:21:07 +13:00
$server->on('start', function (Server $server) {
2020-10-19 00:51:16 +13:00
Console::success('Server started succefully');
2020-10-21 02:22:46 +13:00
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
// listen ctrl + c
Process::signal(2, function () use ($server) {
Console::log('Stop by Ctrl+C');
$server->shutdown();
});
2020-10-16 20:31:09 +13:00
});
2021-02-25 06:12:38 +13:00
$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) {
2021-03-13 03:16:40 +13:00
$app = new App('UTC');
2020-10-21 23:50:11 +13:00
$connection = $request->fd;
$request = new SwooleRequest($request);
2020-10-21 02:22:46 +13:00
2021-03-11 02:39:37 +13:00
Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})");
2020-10-21 23:50:11 +13:00
App::setResource('request', function () use ($request) {
return $request;
});
2021-03-10 05:07:13 +13:00
App::setResource('response', function () {
return new Response(new SwooleResponse());
});
2021-02-22 10:22:32 +13:00
2021-02-25 23:43:39 +13:00
/** @var Appwrite\Database\Document $user */
2020-10-22 20:16:40 +13:00
$user = $app->getResource('user');
2020-10-21 23:50:11 +13:00
/** @var Appwrite\Database\Document $project */
2021-02-25 23:43:39 +13:00
$project = $app->getResource('project');
2021-03-02 05:02:01 +13:00
/** @var Appwrite\Database\Document $console */
$console = $app->getResource('console');
2021-03-10 22:01:24 +13:00
try {
/*
* Project Check
*/
if (empty($project->getId())) {
throw new Exception('Missing or unknown project ID', 1008);
}
2021-03-10 22:01:24 +13:00
/*
* Abuse Check
2021-03-11 02:39:37 +13:00
*
* Abuse limits are connecting 128 times per minute and ip address.
2021-03-10 22:01:24 +13:00
*/
2021-03-11 02:39:37 +13:00
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, function () use ($register) {
2021-03-10 22:01:24 +13:00
return $register->get('db');
});
$timeLimit
->setNamespace('app_' . $project->getId())
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
$abuse = new Abuse($timeLimit);
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
throw new Exception('Too many requests', 1013);
}
2021-03-10 22:01:24 +13:00
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
2021-03-10 22:01:24 +13:00
if (!$originValidator->isValid($origin)) {
throw new Exception($originValidator->getDescription(), 1008);
}
2021-02-25 23:43:39 +13:00
2021-03-10 22:01:24 +13:00
Realtime::setUser($user);
2021-03-02 05:02:01 +13:00
2021-03-10 22:01:24 +13:00
$roles = Realtime::getRoles();
$channels = Realtime::parseChannels($request->getQuery('channels', []));
2021-03-02 05:02:01 +13:00
2021-03-10 22:01:24 +13:00
/**
* Channels Check
*/
if (empty($channels)) {
throw new Exception('Missing channels', 1008);
}
2021-03-10 22:01:24 +13:00
Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
2020-10-22 20:16:40 +13:00
2021-03-10 22:01:24 +13:00
$server->push($connection, json_encode($channels));
} catch (\Throwable $th) {
$response = [
'code' => $th->getCode(),
'message' => $th->getMessage()
];
$server->push($connection, json_encode($response));
$server->close($connection);
}
2020-10-16 20:31:09 +13:00
});
2021-02-25 23:43:39 +13:00
$server->on('message', function (Server $server, Frame $frame) {
2021-03-04 22:28:24 +13:00
$server->push($frame->fd, 'Sending messages is not allowed.');
$server->close($frame->fd);
2020-10-16 20:31:09 +13:00
});
2021-03-11 02:39:37 +13:00
$server->on('close', function (Server $server, int $connection) use (&$connections, &$subscriptions) {
Realtime::unsubscribe($connection, $subscriptions, $connections);
Console::info('Connection close: ' . $connection);
2020-10-16 20:31:09 +13:00
});
2021-02-26 22:21:07 +13:00
$server->start();