1
0
Fork 0
mirror of synced 2024-06-03 03:14:50 +12:00
appwrite/app/realtime.php

596 lines
22 KiB
PHP
Raw Normal View History

2020-10-16 20:31:09 +13:00
<?php
use Appwrite\Auth\Auth;
2021-06-29 02:34:28 +12:00
use Appwrite\Messaging\Adapter\Realtime;
2021-06-25 00:22:32 +12:00
use Appwrite\Network\Validator\Origin;
2021-08-27 21:20:49 +12:00
use Appwrite\Utopia\Response;
2021-06-25 00:22:32 +12:00
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
2021-06-16 21:09:12 +12:00
use Utopia\App;
2021-06-25 00:22:32 +12:00
use Utopia\CLI\Console;
2022-08-15 02:22:38 +12:00
use Utopia\Database\ID;
2022-08-17 15:11:49 +12:00
use Utopia\Database\Role;
use Utopia\Logger\Log;
2021-10-01 00:18:50 +13:00
use Utopia\Database\Database;
2022-07-15 01:12:44 +12:00
use Utopia\Database\DateTime;
2021-10-01 00:18:50 +13:00
use Utopia\Cache\Adapter\Redis as RedisCache;
use Utopia\Cache\Cache;
use Utopia\Database\Adapter\MariaDB;
2021-10-08 04:35:17 +13:00
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Registry\Registry;
use Appwrite\Utopia\Request;
2021-06-25 00:22:32 +12:00
use Utopia\WebSocket\Server;
use Utopia\WebSocket\Adapter;
2021-02-25 06:12:38 +13:00
require_once __DIR__ . '/init.php';
2020-10-16 20:31:09 +13:00
2021-06-25 00:22:32 +12:00
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
2020-10-19 00:51:16 +13:00
2021-08-19 03:44:11 +12:00
$realtime = new Realtime();
2021-06-25 00:22:32 +12:00
2021-08-19 03:44:11 +12:00
/**
* Table for statistics across all workers.
*/
2021-06-25 00:22:32 +12:00
$stats = new Table(4096, 1);
$stats->column('projectId', Table::TYPE_STRING, 64);
2021-08-19 22:14:19 +12:00
$stats->column('teamId', Table::TYPE_STRING, 64);
2021-06-25 00:22:32 +12:00
$stats->column('connections', Table::TYPE_INT);
$stats->column('connectionsTotal', Table::TYPE_INT);
$stats->column('messages', Table::TYPE_INT);
$stats->create();
2021-07-01 22:31:48 +12:00
$containerId = uniqid();
2021-10-08 04:35:17 +13:00
$statsDocument = null;
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
2021-07-01 22:31:48 +12:00
2021-08-19 03:44:11 +12:00
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
2021-06-25 00:22:32 +12:00
2021-08-19 03:44:11 +12:00
$server = new Server($adapter);
2021-06-29 02:34:28 +12:00
2022-05-09 19:35:55 +12:00
$logError = function (Throwable $error, string $action) use ($register) {
$logger = $register->get('logger');
2021-06-25 00:22:32 +12:00
2022-05-09 19:35:55 +12:00
if ($logger) {
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace("realtime");
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->setAction($action);
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$responseCode = $logger->addLog($log);
2022-05-09 19:35:55 +12:00
Console::info('Realtime log pushed with status code: ' . $responseCode);
}
2021-12-27 23:35:51 +13:00
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
};
$server->error($logError);
2021-06-29 02:34:28 +12:00
2021-10-08 04:35:17 +13:00
function getDatabase(Registry &$register, string $namespace)
{
$attempts = 0;
do {
try {
$attempts++;
$db = $register->get('dbPool')->get();
$redis = $register->get('redisPool')->get();
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace($namespace);
if (!$database->exists($database->getDefaultDatabase(), 'realtime')) {
throw new Exception('Collection not ready');
2021-07-01 22:31:48 +12:00
}
2022-06-22 04:00:23 +12:00
break; // leave loop if successful
2022-06-22 04:00:23 +12:00
} catch (\Throwable $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) {
2022-05-09 19:35:55 +12:00
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
}
sleep(DATABASE_RECONNECT_SLEEP);
}
} while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS);
2021-07-01 22:31:48 +12:00
2021-10-08 04:35:17 +13:00
return [
$database,
function () use ($register, $db, $redis) {
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
}
];
2022-05-31 05:41:25 +12:00
}
2021-07-01 22:31:48 +12:00
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
2022-01-04 03:06:40 +13:00
sleep(5); // wait for the initial database schema to be ready
2022-08-10 17:42:20 +12:00
Console::success('Server started successfully');
2021-08-17 21:08:18 +12:00
2021-07-01 22:31:48 +12:00
/**
2021-08-19 03:44:11 +12:00
* Create document for this worker to share stats across Containers.
2021-07-01 22:31:48 +12:00
*/
2022-06-22 04:00:23 +12:00
go(function () use ($register, $containerId, &$statsDocument) {
$attempts = 0;
[$database, $returnDatabase] = getDatabase($register, '_console');
do {
try {
$attempts++;
$document = new Document([
2022-08-15 02:22:38 +12:00
'$id' => ID::unique(),
2022-08-14 22:33:36 +12:00
'$collection' => ID::custom('realtime'),
'$permissions' => [],
2022-06-22 04:00:23 +12:00
'container' => $containerId,
2022-07-15 01:12:44 +12:00
'timestamp' => DateTime::now(),
2022-06-22 04:00:23 +12:00
'value' => '{}'
]);
$statsDocument = Authorization::skip(fn () => $database->createDocument('realtime', $document));
break;
} catch (\Throwable $th) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
call_user_func($returnDatabase);
2021-07-01 22:31:48 +12:00
});
/**
* Save current connections to the Database every 5 seconds.
*/
2022-01-04 03:06:40 +13:00
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
2021-07-01 22:31:48 +12:00
$payload = [];
foreach ($stats as $projectId => $value) {
2021-12-17 00:11:58 +13:00
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
2021-07-01 22:31:48 +12:00
}
2021-10-08 04:35:17 +13:00
if (empty($payload) || empty($statsDocument)) {
2021-07-01 22:31:48 +12:00
return;
}
2021-09-16 01:22:50 +12:00
2021-07-01 22:31:48 +12:00
try {
[$database, $returnDatabase] = getDatabase($register, '_console');
2021-10-08 04:35:17 +13:00
$statsDocument
2022-07-15 01:12:44 +12:00
->setAttribute('timestamp', DateTime::now())
2021-10-08 04:35:17 +13:00
->setAttribute('value', json_encode($payload));
2022-05-09 19:35:55 +12:00
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
2021-07-01 22:31:48 +12:00
} catch (\Throwable $th) {
2021-12-21 22:29:09 +13:00
call_user_func($logError, $th, "updateWorkerDocument");
2021-07-01 22:31:48 +12:00
} finally {
2021-10-08 04:35:17 +13:00
call_user_func($returnDatabase);
2021-07-01 22:31:48 +12:00
}
2021-06-25 00:22:32 +12:00
});
});
2022-06-03 22:34:44 +12:00
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
Console::success('Worker ' . $workerId . ' started successfully');
2021-06-25 00:22:32 +12:00
$attempts = 0;
$start = time();
2021-12-21 22:29:09 +13:00
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
2021-08-17 23:18:32 +12:00
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
2022-08-17 15:11:49 +12:00
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
[$database, $returnDatabase] = getDatabase($register, '_console');
2021-07-01 22:31:48 +12:00
2021-06-25 00:22:32 +12:00
$payload = [];
2021-10-08 04:35:17 +13:00
2022-05-09 19:35:55 +12:00
$list = Authorization::skip(fn () => $database->find('realtime', [
2022-08-12 11:53:52 +12:00
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
2022-05-09 19:35:55 +12:00
]));
2021-07-01 22:31:48 +12:00
2021-08-19 03:44:11 +12:00
/**
* Aggregate stats across containers.
*/
2021-07-01 22:31:48 +12:00
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
2021-06-25 00:22:32 +12:00
}
2021-06-29 02:34:28 +12:00
2021-06-25 00:22:32 +12:00
foreach ($stats as $projectId => $value) {
2021-08-28 04:33:09 +12:00
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
2022-05-18 00:09:04 +12:00
'events' => ['stats.connections'],
2021-08-27 23:26:26 +12:00
'channels' => ['project'],
2022-07-15 01:12:44 +12:00
'timestamp' => DateTime::now(),
2021-08-27 23:26:26 +12:00
'payload' => [
$projectId => $payload[$projectId]
]
]
];
2021-08-27 23:26:26 +12:00
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
2021-06-25 00:22:32 +12:00
}
2021-08-19 20:03:52 +12:00
2021-10-08 04:35:17 +13:00
call_user_func($returnDatabase);
2021-08-17 23:18:32 +12:00
}
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
2022-08-17 15:11:49 +12:00
if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) {
2021-08-17 23:18:32 +12:00
$payload = ['response' => 'WS:/v1/realtime:passed'];
$event = [
'project' => 'console',
2022-08-17 15:11:49 +12:00
'roles' => [Role::guests()->toString()],
2021-08-17 23:18:32 +12:00
'data' => [
2022-05-18 00:09:04 +12:00
'events' => ['test.event'],
2021-08-27 23:26:26 +12:00
'channels' => ['tests'],
2022-07-15 01:12:44 +12:00
'timestamp' => DateTime::now(),
2021-08-27 23:26:26 +12:00
'payload' => $payload
2021-08-17 23:18:32 +12:00
]
];
2021-08-27 23:26:26 +12:00
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
2021-06-25 00:22:32 +12:00
}
});
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
/** @var Redis $redis */
2021-07-01 22:31:48 +12:00
$redis = $register->get('redisPool')->get();
2021-06-25 00:22:32 +12:00
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
2021-08-19 20:24:41 +12:00
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
2021-06-25 00:22:32 +12:00
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$projectId = $event['project'];
2021-06-25 00:22:32 +12:00
$userId = $event['userId'];
2021-06-29 02:34:28 +12:00
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
2022-06-20 21:22:53 +12:00
[$consoleDatabase, $returnConsoleDatabase] = getDatabase($register, '_console');
2022-11-16 01:30:18 +13:00
$project = Authorization::skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
2022-06-20 21:22:53 +12:00
[$database, $returnDatabase] = getDatabase($register, "_{$project->getInternalId()}");
2022-05-09 19:35:55 +12:00
$user = $database->getDocument('users', $userId);
2022-05-09 19:35:55 +12:00
$roles = Auth::getRoles($user);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
2022-05-09 19:35:55 +12:00
call_user_func($returnDatabase);
2022-06-20 21:22:53 +12:00
call_user_func($returnConsoleDatabase);
2022-05-09 19:35:55 +12:00
}
2021-06-25 00:22:32 +12:00
}
2021-07-14 03:18:02 +12:00
$receivers = $realtime->getSubscribers($event);
2021-08-19 20:03:52 +12:00
if (App::isDevelopment() && !empty($receivers)) {
2021-06-25 00:22:32 +12:00
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
$server->send(
$receivers,
2021-08-27 20:20:44 +12:00
json_encode([
'type' => 'event',
'data' => $event['data']
])
2021-06-25 00:22:32 +12:00
);
if (($num = count($receivers)) > 0) {
$stats->incr($event['project'], 'messages', $num);
2021-06-25 00:22:32 +12:00
}
});
} catch (\Throwable $th) {
2021-12-21 22:29:09 +13:00
call_user_func($logError, $th, "pubSubConnection");
2021-06-25 00:22:32 +12:00
Console::error('Pub/sub error: ' . $th->getMessage());
2021-07-01 22:31:48 +12:00
$register->get('redisPool')->put($redis);
2021-06-25 00:22:32 +12:00
$attempts++;
2022-04-04 18:30:07 +12:00
sleep(DATABASE_RECONNECT_SLEEP);
2021-06-25 00:22:32 +12:00
continue;
}
}
Console::error('Failed to restart pub/sub...');
});
2021-12-21 22:29:09 +13:00
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
2021-06-25 00:22:32 +12:00
$app = new App('UTC');
$request = new Request($request);
2021-08-27 21:20:49 +12:00
$response = new Response(new SwooleResponse());
2021-06-25 00:22:32 +12:00
/** @var PDO $db */
$db = $register->get('dbPool')->get();
/** @var Redis $redis */
$redis = $register->get('redisPool')->get();
2021-06-30 04:22:10 +12:00
Console::info("Connection open (user: {$connection})");
2021-06-25 00:22:32 +12:00
2022-05-09 19:35:55 +12:00
App::setResource('db', fn () => $db);
App::setResource('cache', fn () => $redis);
App::setResource('request', fn () => $request);
App::setResource('response', fn () => $response);
2021-06-25 00:22:32 +12:00
try {
2021-10-01 00:18:50 +13:00
/** @var \Utopia\Database\Document $user */
2021-06-25 00:22:32 +12:00
$user = $app->getResource('user');
2021-10-01 00:18:50 +13:00
/** @var \Utopia\Database\Document $project */
2021-06-25 00:22:32 +12:00
$project = $app->getResource('project');
2021-10-01 00:18:50 +13:00
/** @var \Utopia\Database\Document $console */
2021-06-25 00:22:32 +12:00
$console = $app->getResource('console');
2021-10-01 00:18:50 +13:00
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace("_{$project->getInternalId()}");
2021-10-01 00:18:50 +13:00
2021-06-25 00:22:32 +12:00
/*
* Project Check
*/
if (empty($project->getId())) {
throw new Exception('Missing or unknown project ID', 1008);
}
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
2021-10-08 04:35:17 +13:00
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, $database);
2021-06-25 00:22:32 +12:00
$timeLimit
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
$abuse = new Abuse($timeLimit);
2022-01-04 02:42:49 +13:00
if (App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
2021-06-25 00:22:32 +12:00
throw new Exception('Too many requests', 1013);
}
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
if (!$originValidator->isValid($origin) && $project->getId() !== 'console') {
throw new Exception($originValidator->getDescription(), 1008);
}
$roles = Auth::getRoles($user);
2021-06-25 00:22:32 +12:00
2021-07-14 03:18:02 +12:00
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
2021-06-25 00:22:32 +12:00
/**
* Channels Check
*/
if (empty($channels)) {
throw new Exception('Missing channels', 1008);
}
$realtime->subscribe($project->getId(), $connection, $roles, $channels);
2021-06-25 00:22:32 +12:00
2022-07-26 00:37:29 +12:00
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
2021-08-27 21:20:49 +12:00
$server->send([$connection], json_encode([
'type' => 'connected',
'data' => [
'channels' => array_keys($channels),
'user' => $user
]
]));
2021-06-25 00:22:32 +12:00
2021-08-19 22:14:19 +12:00
$stats->set($project->getId(), [
2022-08-15 23:24:31 +12:00
'projectId' => $project->getId(),
2021-08-19 22:14:19 +12:00
'teamId' => $project->getAttribute('teamId')
]);
2021-06-25 00:22:32 +12:00
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (\Throwable $th) {
2021-12-21 22:29:09 +13:00
call_user_func($logError, $th, "initServer");
2021-06-25 00:22:32 +12:00
$response = [
2021-08-27 20:20:44 +12:00
'type' => 'error',
'data' => [
'code' => $th->getCode(),
'message' => $th->getMessage()
]
2021-06-25 00:22:32 +12:00
];
2021-08-19 20:03:52 +12:00
2021-06-25 00:22:32 +12:00
$server->send([$connection], json_encode($response));
$server->close($connection, $th->getCode());
2021-08-19 20:03:52 +12:00
if (App::isDevelopment()) {
Console::error('[Error] Connection Error');
Console::error('[Error] Code: ' . $response['data']['code']);
Console::error('[Error] Message: ' . $response['data']['message']);
2021-08-19 20:03:52 +12:00
}
if ($th instanceof PDOException) {
$db = null;
}
2021-06-25 00:22:32 +12:00
} finally {
/**
* Put used PDO and Redis Connections back into their pools.
*/
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
}
});
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
try {
2022-11-16 01:30:18 +13:00
$app = new App('UTC');
2021-08-27 21:20:49 +12:00
$response = new Response(new SwooleResponse());
$db = $register->get('dbPool')->get();
2021-10-08 04:35:17 +13:00
$redis = $register->get('redisPool')->get();
2021-10-08 04:35:17 +13:00
$cache = new Cache(new RedisCache($redis));
2021-10-01 00:18:50 +13:00
$database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
2022-06-20 21:22:53 +12:00
$database->setNamespace("_console");
$projectId = $realtime->connections[$connection]['projectId'];
2022-11-16 01:30:18 +13:00
$project = $projectId === 'console' ? $app->getResource('console') : Authorization::skip(fn () => $database->getDocument('projects', $projectId));
$database->setNamespace("_{$project->getInternalId()}");
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection.
*/
$timeLimit = new TimeLimit('url:{url},connection:{connection}', 32, 60, $database);
$timeLimit
->setParam('{connection}', $connection)
->setParam('{container}', $containerId);
$abuse = new Abuse($timeLimit);
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
throw new Exception('Too many messages', 1013);
}
$message = json_decode($message, true);
if (is_null($message) || (!array_key_exists('type', $message) && !array_key_exists('data', $message))) {
2021-08-27 04:10:53 +12:00
throw new Exception('Message format is not valid.', 1003);
}
switch ($message['type']) {
2022-05-09 19:35:55 +12:00
/**
* This type is used to authenticate.
*/
case 'authentication':
if (!array_key_exists('session', $message['data'])) {
2021-08-27 21:31:26 +12:00
throw new Exception('Payload is not valid.', 1003);
}
$session = Auth::decodeSession($message['data']['session']);
2021-10-08 04:35:17 +13:00
Auth::$unique = $session['id'] ?? '';
Auth::$secret = $session['secret'] ?? '';
2021-10-01 00:18:50 +13:00
$user = $database->getDocument('users', Auth::$unique);
2022-11-14 22:42:18 +13:00
$authDuration = $project->getAttribute('auths', [])['duration'] ?? Auth::TOKEN_EXPIRATION_LOGIN_LONG;
if (
empty($user->getId()) // Check a document has been found in the DB
|| !Auth::sessionVerify($user->getAttribute('sessions', []), Auth::$secret, $authDuration) // Validate user has valid login token
) {
// cookie not valid
2021-08-27 21:31:26 +12:00
throw new Exception('Session is not valid.', 1003);
}
$roles = Auth::getRoles($user);
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels);
2022-07-26 00:37:29 +12:00
$user = $response->output($user, Response::MODEL_ACCOUNT);
2021-08-27 20:20:44 +12:00
$server->send([$connection], json_encode([
'type' => 'response',
'data' => [
'to' => 'authentication',
2021-08-27 21:20:49 +12:00
'success' => true,
'user' => $user
2021-08-27 20:20:44 +12:00
]
]));
break;
default:
2021-08-27 21:31:26 +12:00
throw new Exception('Message type is not valid.', 1003);
break;
}
} catch (\Throwable $th) {
$response = [
2021-08-27 20:20:44 +12:00
'type' => 'error',
'data' => [
'code' => $th->getCode(),
'message' => $th->getMessage()
]
];
$server->send([$connection], json_encode($response));
if ($th->getCode() === 1008) {
$server->close($connection, $th->getCode());
}
} finally {
$register->get('dbPool')->put($db);
2021-10-08 04:35:17 +13:00
$register->get('redisPool')->put($redis);
}
2021-06-25 00:22:32 +12:00
});
2021-06-30 04:22:10 +12:00
$server->onClose(function (int $connection) use ($realtime, $stats) {
2021-06-29 02:34:28 +12:00
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
2021-06-25 00:22:32 +12:00
}
2021-06-29 02:34:28 +12:00
$realtime->unsubscribe($connection);
2021-06-25 00:22:32 +12:00
Console::info('Connection close: ' . $connection);
});
$server->start();