1
0
Fork 0
mirror of synced 2024-06-29 19:50:26 +12:00
appwrite/app/realtime.php

355 lines
12 KiB
PHP
Raw Normal View History

2020-10-16 20:31:09 +13:00
<?php
2021-02-25 06:12:38 +13:00
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/../vendor/autoload.php';
2020-10-20 04:09:53 +13:00
2020-10-22 20:16:40 +13:00
use Appwrite\Auth\Auth;
2021-02-25 06:12:38 +13:00
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Database\Database;
2021-02-22 10:22:32 +13:00
use Appwrite\Database\Document;
2021-02-25 06:12:38 +13:00
use Appwrite\Database\Validator\Authorization;
use Appwrite\Extend\PDO;
2020-10-16 20:31:09 +13:00
use Swoole\WebSocket\Server;
2020-10-20 04:09:53 +13:00
use Swoole\Http\Request;
2020-10-21 02:22:46 +13:00
use Swoole\Process;
2020-10-16 20:31:09 +13:00
use Swoole\WebSocket\Frame;
2020-10-21 02:22:46 +13:00
use Utopia\App;
2020-10-19 00:51:16 +13:00
use Utopia\CLI\Console;
2021-02-25 06:12:38 +13:00
use Utopia\Config\Config;
use Utopia\Registry\Registry;
2021-02-22 10:22:32 +13:00
use Utopia\Swoole\Request as SwooleRequest;
2021-02-25 06:12:38 +13:00
use PDO as PDONative;
2020-10-16 20:31:09 +13:00
/**
* TODO List
*
* - Abuse Control / x mesages per connection
* - CORS Validation
* - Limit payload size
* - Message structure: { status: "ok"|"error", event: EVENT_NAME, data: <any arbitrary data> }
* - JWT Authentication (in path / or in message)
2020-10-17 18:48:03 +13:00
*
2020-10-21 02:22:46 +13:00
* Protocols Support:
* - Websocket support: https://www.swoole.co.uk/docs/modules/swoole-websocket-server
* - MQTT support: https://www.swoole.co.uk/docs/modules/swoole-mqtt-server
* - SSE support: https://github.com/hhxsv5/php-sse
* - Socket.io support: https://github.com/shuixn/socket.io-swoole-server
2020-10-16 20:31:09 +13:00
*/
2020-10-20 04:09:53 +13:00
ini_set('default_socket_timeout', -1);
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
2020-10-19 00:51:16 +13:00
2020-10-16 20:31:09 +13:00
$server = new Server("0.0.0.0", 80);
2021-02-25 06:12:38 +13:00
$server->set([
'worker_num' => 1
2021-02-25 23:43:39 +13:00
]);
2020-10-21 23:50:11 +13:00
$subscriptions = [];
2020-10-22 01:03:50 +13:00
$connections = [];
2020-10-19 00:51:16 +13:00
2021-02-25 06:12:38 +13:00
$register = new Registry();
$register->set('db', function () { // Register DB connection
$dbHost = App::getEnv('_APP_DB_HOST', '');
$dbUser = App::getEnv('_APP_DB_USER', '');
$dbPass = App::getEnv('_APP_DB_PASS', '');
$dbScheme = App::getEnv('_APP_DB_SCHEMA', '');
$pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array(
PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
PDONative::ATTR_TIMEOUT => 3, // Seconds
PDONative::ATTR_PERSISTENT => true
));
// Connection settings
$pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays
$pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions
return $pdo;
});
$register->set('cache', function () { // Register cache connection
$redis = new Redis();
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
2021-02-25 23:43:39 +13:00
$user = App::getEnv('_APP_REDIS_USER', '');
$pass = App::getEnv('_APP_REDIS_PASS', '');
2021-02-25 06:12:38 +13:00
$auth = [];
2021-02-25 23:43:39 +13:00
if (!empty($user)) {
2021-02-25 06:12:38 +13:00
$auth["user"] = $user;
}
2021-02-25 23:43:39 +13:00
if (!empty($pass)) {
2021-02-25 06:12:38 +13:00
$auth["pass"] = $pass;
}
2021-02-25 23:43:39 +13:00
if (!empty($auth)) {
2021-02-25 06:12:38 +13:00
$redis->auth($auth);
}
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
return $redis;
});
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
Console::success('Worker ' . ++$workerId . ' started succefully');
2020-10-19 00:51:16 +13:00
2020-10-20 04:09:53 +13:00
$attempts = 0;
$start = time();
2021-02-25 06:12:38 +13:00
2020-10-20 09:38:49 +13:00
while ($attempts < 300) {
2020-10-20 04:09:53 +13:00
try {
2021-02-25 06:12:38 +13:00
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
2020-10-20 09:38:49 +13:00
sleep(5); // 1 sec delay between connection attempts
}
2021-02-25 06:12:38 +13:00
$redis = $register->get('cache');
2020-10-20 04:09:53 +13:00
2021-02-25 06:12:38 +13:00
if ($redis->ping(true)) {
2020-10-20 04:09:53 +13:00
$attempts = 0;
2021-02-25 06:12:38 +13:00
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
2020-10-20 07:56:02 +13:00
}
2021-02-22 10:22:32 +13:00
2021-02-25 23:43:39 +13:00
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, &$connections, &$subscriptions) {
2021-02-22 10:22:32 +13:00
/**
* Supported Resources:
* - Collection
* - Document
* - Bucket
* - File
* - User? / Account? (no permissions)
* - Session? (no permissions)
* - Team? (no permissions)
* - Membership? (no permissions)
* - Function
* - Execution
*/
2021-02-25 23:43:39 +13:00
$event = json_decode($payload, true);
2021-02-25 06:12:38 +13:00
$receivers = [];
foreach ($connections as $fd => $connection) {
2021-02-25 23:43:39 +13:00
if ($connection['projectId'] !== $event['project']) {
2021-02-25 06:12:38 +13:00
continue;
}
foreach ($connection['roles'] as $role) {
2021-02-25 23:43:39 +13:00
if (\array_key_exists($role, $subscriptions[$event['project']])) {
foreach ($event['data']['channels'] as $channel) {
if (\array_key_exists($channel, $subscriptions[$event['project']][$role]) && \in_array($role, $event['permissions'])) {
foreach (array_keys($subscriptions[$event['project']][$role][$channel]) as $ids) {
$receivers[] = $ids;
}
2021-02-25 06:12:38 +13:00
break;
}
}
}
}
}
$receivers = array_keys(array_flip($receivers));
foreach ($receivers as $receiver) {
2021-02-25 23:43:39 +13:00
if ($server->exist($receiver) && $server->isEstablished($receiver)) {
$server->push(
$receiver,
json_encode($event['data']),
SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS
);
} else {
2021-02-25 06:12:38 +13:00
$server->close($receiver);
2021-02-22 10:22:32 +13:00
}
}
2020-10-20 04:09:53 +13:00
});
} catch (\Throwable $th) {
2021-02-25 06:12:38 +13:00
Console::error('Pub/sub error: ' . $th->getMessage());
2020-10-20 04:09:53 +13:00
$attempts++;
continue;
}
2020-10-20 09:38:49 +13:00
$attempts++;
2020-10-20 04:09:53 +13:00
}
2020-10-19 00:51:16 +13:00
2020-10-22 01:03:50 +13:00
Console::error('Failed to restart pub/sub...');
2020-10-19 00:51:16 +13:00
});
2020-10-16 20:31:09 +13:00
$server->on("start", function (Server $server) {
2020-10-19 00:51:16 +13:00
Console::success('Server started succefully');
2020-10-21 02:22:46 +13:00
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
// listen ctrl + c
Process::signal(2, function () use ($server) {
Console::log('Stop by Ctrl+C');
$server->shutdown();
});
2020-10-16 20:31:09 +13:00
});
2021-02-25 06:12:38 +13:00
$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) {
2020-10-20 04:09:53 +13:00
Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})");
2020-10-19 00:51:16 +13:00
2020-10-22 20:16:40 +13:00
$app = new App('');
2020-10-21 23:50:11 +13:00
$connection = $request->fd;
$request = new SwooleRequest($request);
2020-10-21 02:22:46 +13:00
2020-10-21 23:50:11 +13:00
App::setResource('request', function () use ($request) {
return $request;
});
2021-02-25 06:12:38 +13:00
App::setResource('consoleDB', function () use (&$register) {
$consoleDB = new Database();
$consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis
$consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects
$consoleDB->setMocks(Config::getParam('collections', []));
2020-10-21 23:50:11 +13:00
2021-02-25 06:12:38 +13:00
return $consoleDB;
}, ['register']);
2021-02-22 10:22:32 +13:00
2021-02-25 06:12:38 +13:00
App::setResource('project', function ($consoleDB, $request) {
/** @var Utopia\Swoole\Request $request */
/** @var Appwrite\Database\Database $consoleDB */
Authorization::disable();
$project = $consoleDB->getDocument($request->getQuery('project'));
Authorization::reset();
return $project;
}, ['consoleDB', 'request']);
App::setResource('user', function ($project, $request, $projectDB) {
/** @var Utopia\Swoole\Request $request */
/** @var Appwrite\Database\Document $project */
/** @var Appwrite\Database\Database $projectDB */
Authorization::setDefaultStatus(true);
Auth::setCookieName('a_session_' . $project->getId());
$session = Auth::decodeSession(
$request->getCookie(
Auth::$cookieName, // Get sessions
$request->getCookie(Auth::$cookieName . '_legacy', '')
)
); // Get fallback session from old clients (no SameSite support)
Auth::$unique = $session['id'];
Auth::$secret = $session['secret'];
$user = $projectDB->getDocument(Auth::$unique);
if (
empty($user->getId()) // Check a document has been found in the DB
|| Database::SYSTEM_COLLECTION_USERS !== $user->getCollection() // Validate returned document is really a user document
|| !Auth::tokenVerify($user->getAttribute('tokens', []), Auth::TOKEN_TYPE_LOGIN, Auth::$secret)
) { // Validate user has valid login token
$user = new Document(['$id' => '', '$collection' => Database::SYSTEM_COLLECTION_USERS]);
}
return $user;
}, ['project', 'request', 'projectDB']);
2021-02-22 10:22:32 +13:00
2021-02-25 23:43:39 +13:00
/** @var Appwrite\Database\Document $user */
2020-10-22 20:16:40 +13:00
$user = $app->getResource('user');
2020-10-21 23:50:11 +13:00
/** @var Appwrite\Database\Document $project */
2021-02-25 23:43:39 +13:00
$project = $app->getResource('project');
$channels = $request->getQuery('channels', []);
if (empty($project->getId())) {
$server->push($connection, 'Missing or unknown project ID');
$server->close($connection);
}
if (empty($request->getQuery('channels', []))) {
$server->push($connection, 'Missing or unknown channels');
$server->close($connection);
}
$roles = ['*', 'user:' . $user->getId(), 'role:' . (($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
$channels = array_flip($channels);
2020-10-21 23:50:11 +13:00
2020-10-22 20:16:40 +13:00
\array_map(function ($node) use (&$roles) {
if (isset($node['teamId']) && isset($node['roles'])) {
2021-02-25 06:12:38 +13:00
$roles[] = 'team:' . $node['teamId'];
2020-10-22 20:16:40 +13:00
foreach ($node['roles'] as $nodeRole) { // Set all team roles
2021-02-25 06:12:38 +13:00
$roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole;
2020-10-22 20:16:40 +13:00
}
}
}, $user->getAttribute('memberships', []));
2021-02-22 10:22:32 +13:00
/**
* Build Subscriptions Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
2021-02-25 06:12:38 +13:00
if (!isset($subscriptions[$project->getId()])) { // Init Project
2020-10-21 23:50:11 +13:00
$subscriptions[$project->getId()] = [];
}
2020-10-22 20:16:40 +13:00
foreach ($roles as $key => $role) {
2021-02-25 06:12:38 +13:00
if (!isset($subscriptions[$project->getId()][$role])) { // Add user first connection
2020-10-22 20:16:40 +13:00
$subscriptions[$project->getId()][$role] = [];
}
2021-02-25 06:12:38 +13:00
2020-10-22 20:16:40 +13:00
foreach ($channels as $channel => $list) {
$subscriptions[$project->getId()][$role][$channel][$connection] = true;
}
2020-10-22 01:03:50 +13:00
}
$connections[$connection] = [
'projectId' => $project->getId(),
2020-10-22 20:16:40 +13:00
'roles' => $roles,
2020-10-21 23:50:11 +13:00
];
2020-10-16 20:31:09 +13:00
});
2021-02-25 23:43:39 +13:00
$server->on('message', function (Server $server, Frame $frame) {
2021-02-25 06:12:38 +13:00
if ($frame->data === 'reload') {
2020-10-20 04:09:53 +13:00
$server->reload();
}
2021-02-25 06:12:38 +13:00
Console::info('Recieved message: ' . $frame->data . ' (user: ' . $frame->fd . ', worker: ' . $server->getWorkerId() . ')');
2020-10-16 20:31:09 +13:00
});
2021-02-25 06:12:38 +13:00
$server->on('close', function (Server $server, int $fd) use (&$connections, &$subscriptions) {
2020-10-22 01:03:50 +13:00
$projectId = $connections[$fd]['projectId'] ?? '';
2020-10-22 20:16:40 +13:00
$roles = $connections[$fd]['roles'] ?? [];
2020-10-22 01:03:50 +13:00
2020-10-22 20:16:40 +13:00
foreach ($roles as $key => $role) {
foreach ($subscriptions[$projectId][$role] as $channel => $list) {
unset($subscriptions[$projectId][$role][$channel][$fd]); // Remove connection
2020-10-22 01:03:50 +13:00
2021-02-25 06:12:38 +13:00
if (empty($subscriptions[$projectId][$role][$channel])) {
2021-02-22 10:22:32 +13:00
unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
2020-10-22 20:16:40 +13:00
}
2020-10-22 01:03:50 +13:00
}
2021-02-25 06:12:38 +13:00
if (empty($subscriptions[$projectId][$role])) {
2021-02-22 10:22:32 +13:00
unset($subscriptions[$projectId][$role]); // Remove role when no channels
2020-10-22 20:16:40 +13:00
}
2020-10-22 01:03:50 +13:00
}
2021-02-25 06:12:38 +13:00
if (empty($subscriptions[$projectId])) { // Remove project when no roles
2021-02-22 10:22:32 +13:00
unset($subscriptions[$projectId]);
}
2020-10-22 01:03:50 +13:00
unset($connections[$fd]);
2021-02-25 06:12:38 +13:00
Console::info('Connection close: ' . $fd);
2020-10-16 20:31:09 +13:00
});
$server->start();