2020-10-16 20:31:09 +13:00
|
|
|
<?php
|
|
|
|
|
2021-06-28 22:18:00 +12:00
|
|
|
use Appwrite\Auth\Auth;
|
2021-06-29 02:34:28 +12:00
|
|
|
use Appwrite\Messaging\Adapter\Realtime;
|
2021-06-25 00:22:32 +12:00
|
|
|
use Appwrite\Network\Validator\Origin;
|
2021-08-27 21:20:49 +12:00
|
|
|
use Appwrite\Utopia\Response;
|
2021-06-25 00:22:32 +12:00
|
|
|
use Swoole\Http\Request as SwooleRequest;
|
|
|
|
use Swoole\Http\Response as SwooleResponse;
|
|
|
|
use Swoole\Runtime;
|
|
|
|
use Swoole\Table;
|
|
|
|
use Swoole\Timer;
|
|
|
|
use Utopia\Abuse\Abuse;
|
|
|
|
use Utopia\Abuse\Adapters\TimeLimit;
|
2021-06-16 21:09:12 +12:00
|
|
|
use Utopia\App;
|
2021-06-25 00:22:32 +12:00
|
|
|
use Utopia\CLI\Console;
|
2022-08-15 02:22:38 +12:00
|
|
|
use Utopia\Database\ID;
|
2022-08-17 15:11:49 +12:00
|
|
|
use Utopia\Database\Role;
|
2021-11-24 03:24:25 +13:00
|
|
|
use Utopia\Logger\Log;
|
2022-07-15 01:12:44 +12:00
|
|
|
use Utopia\Database\DateTime;
|
2021-10-08 04:35:17 +13:00
|
|
|
use Utopia\Database\Document;
|
|
|
|
use Utopia\Database\Query;
|
|
|
|
use Utopia\Database\Validator\Authorization;
|
2021-12-31 05:17:01 +13:00
|
|
|
use Appwrite\Utopia\Request;
|
2022-10-18 06:26:21 +13:00
|
|
|
use Utopia\Cache\Adapter\Sharding;
|
|
|
|
use Utopia\Cache\Cache;
|
|
|
|
use Utopia\Config\Config;
|
|
|
|
use Utopia\Database\Database;
|
2021-06-25 00:22:32 +12:00
|
|
|
use Utopia\WebSocket\Server;
|
|
|
|
use Utopia\WebSocket\Adapter;
|
2021-02-25 06:12:38 +13:00
|
|
|
|
2021-06-14 22:48:31 +12:00
|
|
|
require_once __DIR__ . '/init.php';
|
2020-10-16 20:31:09 +13:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
2020-10-19 00:51:16 +13:00
|
|
|
|
2022-10-18 06:26:21 +13:00
|
|
|
function getConsoleDB(): Database
|
|
|
|
{
|
|
|
|
global $register;
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
/** @var \Utopia\Pools\Group $pools */
|
2022-11-16 20:20:08 +13:00
|
|
|
$pools = $register->get('pools');
|
2022-10-19 21:35:30 +13:00
|
|
|
|
2022-10-18 06:26:21 +13:00
|
|
|
$dbAdapter = $pools
|
|
|
|
->get('console')
|
|
|
|
->pop()
|
|
|
|
->getResource()
|
|
|
|
;
|
|
|
|
|
|
|
|
$database = new Database($dbAdapter, getCache());
|
|
|
|
|
|
|
|
$database->setNamespace('console');
|
|
|
|
|
|
|
|
return $database;
|
|
|
|
}
|
|
|
|
|
|
|
|
function getProjectDB(Document $project): Database
|
|
|
|
{
|
|
|
|
global $register;
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
/** @var \Utopia\Pools\Group $pools */
|
2022-11-16 20:20:08 +13:00
|
|
|
$pools = $register->get('pools');
|
2022-10-18 06:26:21 +13:00
|
|
|
|
2022-10-19 21:35:30 +13:00
|
|
|
if ($project->isEmpty() || $project->getId() === 'console') {
|
2022-10-18 06:26:21 +13:00
|
|
|
return getConsoleDB();
|
|
|
|
}
|
|
|
|
|
|
|
|
$dbAdapter = $pools
|
|
|
|
->get($project->getAttribute('database'))
|
|
|
|
->pop()
|
|
|
|
->getResource()
|
|
|
|
;
|
|
|
|
|
|
|
|
$database = new Database($dbAdapter, getCache());
|
2022-10-19 21:35:30 +13:00
|
|
|
$database->setNamespace('_' . $project->getInternalId());
|
2022-10-18 06:26:21 +13:00
|
|
|
|
|
|
|
return $database;
|
|
|
|
}
|
|
|
|
|
|
|
|
function getCache(): Cache
|
|
|
|
{
|
|
|
|
global $register;
|
|
|
|
|
2022-11-16 20:20:08 +13:00
|
|
|
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
|
2022-10-19 21:35:30 +13:00
|
|
|
|
2022-10-18 06:26:21 +13:00
|
|
|
$list = Config::getParam('pools-cache', []);
|
|
|
|
$adapters = [];
|
2022-10-19 21:35:30 +13:00
|
|
|
|
2022-10-18 06:26:21 +13:00
|
|
|
foreach ($list as $value) {
|
|
|
|
$adapters[] = $pools
|
|
|
|
->get($value)
|
|
|
|
->pop()
|
|
|
|
->getResource()
|
|
|
|
;
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Cache(new Sharding($adapters));
|
|
|
|
}
|
|
|
|
|
2021-08-19 03:44:11 +12:00
|
|
|
$realtime = new Realtime();
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2021-08-19 03:44:11 +12:00
|
|
|
/**
|
|
|
|
* Table for statistics across all workers.
|
|
|
|
*/
|
2021-06-25 00:22:32 +12:00
|
|
|
$stats = new Table(4096, 1);
|
|
|
|
$stats->column('projectId', Table::TYPE_STRING, 64);
|
2021-08-19 22:14:19 +12:00
|
|
|
$stats->column('teamId', Table::TYPE_STRING, 64);
|
2021-06-25 00:22:32 +12:00
|
|
|
$stats->column('connections', Table::TYPE_INT);
|
|
|
|
$stats->column('connectionsTotal', Table::TYPE_INT);
|
|
|
|
$stats->column('messages', Table::TYPE_INT);
|
|
|
|
$stats->create();
|
|
|
|
|
2021-07-01 22:31:48 +12:00
|
|
|
$containerId = uniqid();
|
2021-10-08 04:35:17 +13:00
|
|
|
$statsDocument = null;
|
2022-03-01 00:05:11 +13:00
|
|
|
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
|
2021-07-01 22:31:48 +12:00
|
|
|
|
2021-08-19 03:44:11 +12:00
|
|
|
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
|
2022-03-01 00:05:11 +13:00
|
|
|
$adapter
|
|
|
|
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
|
|
|
|
->setWorkerNumber($workerNumber);
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2021-08-19 03:44:11 +12:00
|
|
|
$server = new Server($adapter);
|
2021-06-29 02:34:28 +12:00
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
$logError = function (Throwable $error, string $action) use ($register) {
|
2021-11-24 03:24:25 +13:00
|
|
|
$logger = $register->get('logger');
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
if ($logger) {
|
2022-01-01 03:36:54 +13:00
|
|
|
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$log = new Log();
|
|
|
|
$log->setNamespace("realtime");
|
|
|
|
$log->setServer(\gethostname());
|
|
|
|
$log->setVersion($version);
|
|
|
|
$log->setType(Log::TYPE_ERROR);
|
|
|
|
$log->setMessage($error->getMessage());
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$log->addTag('code', $error->getCode());
|
|
|
|
$log->addTag('verboseType', get_class($error));
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$log->addExtra('file', $error->getFile());
|
|
|
|
$log->addExtra('line', $error->getLine());
|
|
|
|
$log->addExtra('trace', $error->getTraceAsString());
|
2022-03-18 23:05:38 +13:00
|
|
|
$log->addExtra('detailedTrace', $error->getTrace());
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$log->setAction($action);
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
|
|
|
|
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2022-01-01 03:36:54 +13:00
|
|
|
$responseCode = $logger->addLog($log);
|
2022-05-09 19:35:55 +12:00
|
|
|
Console::info('Realtime log pushed with status code: ' . $responseCode);
|
2022-01-01 03:36:54 +13:00
|
|
|
}
|
2021-12-27 23:35:51 +13:00
|
|
|
|
|
|
|
Console::error('[Error] Type: ' . get_class($error));
|
|
|
|
Console::error('[Error] Message: ' . $error->getMessage());
|
|
|
|
Console::error('[Error] File: ' . $error->getFile());
|
|
|
|
Console::error('[Error] Line: ' . $error->getLine());
|
2021-11-26 03:13:14 +13:00
|
|
|
};
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2021-11-30 01:52:18 +13:00
|
|
|
$server->error($logError);
|
2021-06-29 02:34:28 +12:00
|
|
|
|
2022-01-01 02:40:14 +13:00
|
|
|
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) {
|
2022-01-04 03:06:40 +13:00
|
|
|
sleep(5); // wait for the initial database schema to be ready
|
2022-08-10 17:42:20 +12:00
|
|
|
Console::success('Server started successfully');
|
2021-08-17 21:08:18 +12:00
|
|
|
|
2021-07-01 22:31:48 +12:00
|
|
|
/**
|
2021-08-19 03:44:11 +12:00
|
|
|
* Create document for this worker to share stats across Containers.
|
2021-07-01 22:31:48 +12:00
|
|
|
*/
|
2022-11-14 11:42:35 +13:00
|
|
|
go(function () use ($register, $containerId, &$statsDocument) {
|
2022-06-22 04:00:23 +12:00
|
|
|
$attempts = 0;
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getConsoleDB();
|
2022-10-19 21:35:30 +13:00
|
|
|
|
2022-06-22 04:00:23 +12:00
|
|
|
do {
|
|
|
|
try {
|
|
|
|
$attempts++;
|
|
|
|
$document = new Document([
|
2022-08-15 02:22:38 +12:00
|
|
|
'$id' => ID::unique(),
|
2022-08-14 22:33:36 +12:00
|
|
|
'$collection' => ID::custom('realtime'),
|
2022-08-02 21:19:15 +12:00
|
|
|
'$permissions' => [],
|
2022-06-22 04:00:23 +12:00
|
|
|
'container' => $containerId,
|
2022-07-15 01:12:44 +12:00
|
|
|
'timestamp' => DateTime::now(),
|
2022-06-22 04:00:23 +12:00
|
|
|
'value' => '{}'
|
|
|
|
]);
|
|
|
|
|
|
|
|
$statsDocument = Authorization::skip(fn () => $database->createDocument('realtime', $document));
|
|
|
|
break;
|
|
|
|
} catch (\Throwable $th) {
|
|
|
|
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
|
|
|
|
sleep(DATABASE_RECONNECT_SLEEP);
|
|
|
|
}
|
|
|
|
} while (true);
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-07-01 22:31:48 +12:00
|
|
|
});
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Save current connections to the Database every 5 seconds.
|
|
|
|
*/
|
2022-11-14 11:42:35 +13:00
|
|
|
Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) {
|
2021-07-01 22:31:48 +12:00
|
|
|
$payload = [];
|
|
|
|
foreach ($stats as $projectId => $value) {
|
2021-12-17 00:11:58 +13:00
|
|
|
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
2021-07-01 22:31:48 +12:00
|
|
|
}
|
2021-10-08 04:35:17 +13:00
|
|
|
if (empty($payload) || empty($statsDocument)) {
|
2021-07-01 22:31:48 +12:00
|
|
|
return;
|
|
|
|
}
|
2021-09-16 01:22:50 +12:00
|
|
|
|
2021-07-01 22:31:48 +12:00
|
|
|
try {
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getConsoleDB();
|
2021-10-08 04:35:17 +13:00
|
|
|
|
|
|
|
$statsDocument
|
2022-07-15 01:12:44 +12:00
|
|
|
->setAttribute('timestamp', DateTime::now())
|
2021-10-08 04:35:17 +13:00
|
|
|
->setAttribute('value', json_encode($payload));
|
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
|
2021-07-01 22:31:48 +12:00
|
|
|
} catch (\Throwable $th) {
|
2021-12-21 22:29:09 +13:00
|
|
|
call_user_func($logError, $th, "updateWorkerDocument");
|
2021-07-01 22:31:48 +12:00
|
|
|
} finally {
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-07-01 22:31:48 +12:00
|
|
|
}
|
2021-06-25 00:22:32 +12:00
|
|
|
});
|
|
|
|
});
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) {
|
2021-09-27 10:33:36 +13:00
|
|
|
Console::success('Worker ' . $workerId . ' started successfully');
|
2021-06-25 00:22:32 +12:00
|
|
|
|
|
|
|
$attempts = 0;
|
|
|
|
$start = time();
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) {
|
2021-08-17 23:18:32 +12:00
|
|
|
/**
|
|
|
|
* Sending current connections to project channels on the console project every 5 seconds.
|
|
|
|
*/
|
2022-08-17 15:11:49 +12:00
|
|
|
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getConsoleDB();
|
2021-07-01 22:31:48 +12:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
$payload = [];
|
2021-10-08 04:35:17 +13:00
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
$list = Authorization::skip(fn () => $database->find('realtime', [
|
2022-08-12 11:53:52 +12:00
|
|
|
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
|
2022-05-09 19:35:55 +12:00
|
|
|
]));
|
2021-07-01 22:31:48 +12:00
|
|
|
|
2021-08-19 03:44:11 +12:00
|
|
|
/**
|
|
|
|
* Aggregate stats across containers.
|
|
|
|
*/
|
2021-07-01 22:31:48 +12:00
|
|
|
foreach ($list as $document) {
|
|
|
|
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
|
|
|
if (array_key_exists($projectId, $payload)) {
|
|
|
|
$payload[$projectId] += $value;
|
|
|
|
} else {
|
|
|
|
$payload[$projectId] = $value;
|
|
|
|
}
|
|
|
|
}
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
2021-06-29 02:34:28 +12:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
foreach ($stats as $projectId => $value) {
|
2021-08-28 04:33:09 +12:00
|
|
|
if (!array_key_exists($projectId, $payload)) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-08-19 21:41:08 +12:00
|
|
|
$event = [
|
2022-06-20 19:37:00 +12:00
|
|
|
'project' => 'console',
|
2021-09-09 20:26:53 +12:00
|
|
|
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
|
2021-08-19 21:41:08 +12:00
|
|
|
'data' => [
|
2022-05-18 00:09:04 +12:00
|
|
|
'events' => ['stats.connections'],
|
2021-08-27 23:26:26 +12:00
|
|
|
'channels' => ['project'],
|
2022-07-15 01:12:44 +12:00
|
|
|
'timestamp' => DateTime::now(),
|
2021-08-27 23:26:26 +12:00
|
|
|
'payload' => [
|
|
|
|
$projectId => $payload[$projectId]
|
2021-08-19 22:18:39 +12:00
|
|
|
]
|
2021-08-19 21:41:08 +12:00
|
|
|
]
|
|
|
|
];
|
|
|
|
|
2021-08-27 23:26:26 +12:00
|
|
|
$server->send($realtime->getSubscribers($event), json_encode([
|
|
|
|
'type' => 'event',
|
|
|
|
'data' => $event['data']
|
|
|
|
]));
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
2021-08-19 20:03:52 +12:00
|
|
|
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-08-17 23:18:32 +12:00
|
|
|
}
|
|
|
|
/**
|
|
|
|
* Sending test message for SDK E2E tests every 5 seconds.
|
|
|
|
*/
|
2022-08-17 15:11:49 +12:00
|
|
|
if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) {
|
2021-08-17 23:18:32 +12:00
|
|
|
$payload = ['response' => 'WS:/v1/realtime:passed'];
|
|
|
|
|
|
|
|
$event = [
|
2022-06-20 19:37:00 +12:00
|
|
|
'project' => 'console',
|
2022-08-17 15:11:49 +12:00
|
|
|
'roles' => [Role::guests()->toString()],
|
2021-08-17 23:18:32 +12:00
|
|
|
'data' => [
|
2022-05-18 00:09:04 +12:00
|
|
|
'events' => ['test.event'],
|
2021-08-27 23:26:26 +12:00
|
|
|
'channels' => ['tests'],
|
2022-07-15 01:12:44 +12:00
|
|
|
'timestamp' => DateTime::now(),
|
2021-08-27 23:26:26 +12:00
|
|
|
'payload' => $payload
|
2021-08-17 23:18:32 +12:00
|
|
|
]
|
|
|
|
];
|
|
|
|
|
2021-08-27 23:26:26 +12:00
|
|
|
$server->send($realtime->getSubscribers($event), json_encode([
|
|
|
|
'type' => 'event',
|
|
|
|
'data' => $event['data']
|
|
|
|
]));
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
while ($attempts < 300) {
|
|
|
|
try {
|
|
|
|
if ($attempts > 0) {
|
|
|
|
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
|
|
|
|
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
|
|
|
|
sleep(5); // 5 sec delay between connection attempts
|
|
|
|
}
|
|
|
|
$start = time();
|
|
|
|
|
2022-11-16 20:20:08 +13:00
|
|
|
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
|
2021-06-25 00:22:32 +12:00
|
|
|
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
|
|
|
|
|
|
|
if ($redis->ping(true)) {
|
|
|
|
$attempts = 0;
|
|
|
|
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
|
|
|
|
} else {
|
|
|
|
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
|
|
|
|
}
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
|
2021-06-25 00:22:32 +12:00
|
|
|
$event = json_decode($payload, true);
|
|
|
|
|
|
|
|
if ($event['permissionsChanged'] && isset($event['userId'])) {
|
2022-06-20 19:37:00 +12:00
|
|
|
$projectId = $event['project'];
|
2021-06-25 00:22:32 +12:00
|
|
|
$userId = $event['userId'];
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2021-06-29 02:34:28 +12:00
|
|
|
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
|
|
|
|
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
|
2022-10-18 06:26:21 +13:00
|
|
|
$consoleDatabase = getConsoleDB();
|
2022-06-20 21:22:53 +12:00
|
|
|
$project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getProjectDB($project);
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
$user = $database->getDocument('users', $userId);
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2022-05-09 19:35:55 +12:00
|
|
|
$roles = Auth::getRoles($user);
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2022-06-20 19:37:00 +12:00
|
|
|
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2022-05-09 19:35:55 +12:00
|
|
|
}
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2021-07-14 03:18:02 +12:00
|
|
|
$receivers = $realtime->getSubscribers($event);
|
2021-06-28 22:18:00 +12:00
|
|
|
|
2021-08-19 20:03:52 +12:00
|
|
|
if (App::isDevelopment() && !empty($receivers)) {
|
2021-06-25 00:22:32 +12:00
|
|
|
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
|
|
|
|
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
|
|
|
|
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
|
|
|
|
}
|
|
|
|
|
|
|
|
$server->send(
|
|
|
|
$receivers,
|
2021-08-27 20:20:44 +12:00
|
|
|
json_encode([
|
|
|
|
'type' => 'event',
|
|
|
|
'data' => $event['data']
|
|
|
|
])
|
2021-06-25 00:22:32 +12:00
|
|
|
);
|
|
|
|
|
|
|
|
if (($num = count($receivers)) > 0) {
|
2022-06-20 19:37:00 +12:00
|
|
|
$stats->incr($event['project'], 'messages', $num);
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (\Throwable $th) {
|
2021-12-21 22:29:09 +13:00
|
|
|
call_user_func($logError, $th, "pubSubConnection");
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
Console::error('Pub/sub error: ' . $th->getMessage());
|
|
|
|
$attempts++;
|
2022-04-04 18:30:07 +12:00
|
|
|
sleep(DATABASE_RECONNECT_SLEEP);
|
2021-06-25 00:22:32 +12:00
|
|
|
continue;
|
2022-11-09 16:39:15 +13:00
|
|
|
} finally {
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Console::error('Failed to restart pub/sub...');
|
|
|
|
});
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) {
|
2021-06-25 00:22:32 +12:00
|
|
|
$app = new App('UTC');
|
|
|
|
$request = new Request($request);
|
2021-08-27 21:20:49 +12:00
|
|
|
$response = new Response(new SwooleResponse());
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2021-06-30 04:22:10 +12:00
|
|
|
Console::info("Connection open (user: {$connection})");
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2022-11-16 20:20:08 +13:00
|
|
|
App::setResource('pools', fn() => $register->get('pools'));
|
2022-08-13 19:57:04 +12:00
|
|
|
App::setResource('request', fn() => $request);
|
|
|
|
App::setResource('response', fn() => $response);
|
2021-06-25 00:22:32 +12:00
|
|
|
|
|
|
|
try {
|
2021-10-01 00:18:50 +13:00
|
|
|
/** @var \Utopia\Database\Document $project */
|
2021-06-25 00:22:32 +12:00
|
|
|
$project = $app->getResource('project');
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Project Check
|
|
|
|
*/
|
|
|
|
if (empty($project->getId())) {
|
|
|
|
throw new Exception('Missing or unknown project ID', 1008);
|
|
|
|
}
|
|
|
|
|
2022-10-18 06:26:21 +13:00
|
|
|
$dbForProject = getProjectDB($project);
|
|
|
|
$console = $app->getResource('console'); /** @var \Utopia\Database\Document $console */
|
|
|
|
$user = $app->getResource('user'); /** @var \Utopia\Database\Document $user */
|
2022-07-07 09:43:54 +12:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
/*
|
|
|
|
* Abuse Check
|
|
|
|
*
|
|
|
|
* Abuse limits are connecting 128 times per minute and ip address.
|
|
|
|
*/
|
2022-07-16 08:19:50 +12:00
|
|
|
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, $dbForProject);
|
2021-06-25 00:22:32 +12:00
|
|
|
$timeLimit
|
|
|
|
->setParam('{ip}', $request->getIP())
|
|
|
|
->setParam('{url}', $request->getURI());
|
|
|
|
|
|
|
|
$abuse = new Abuse($timeLimit);
|
|
|
|
|
2022-01-04 02:42:49 +13:00
|
|
|
if (App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
|
2021-06-25 00:22:32 +12:00
|
|
|
throw new Exception('Too many requests', 1013);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Validate Client Domain - Check to avoid CSRF attack.
|
|
|
|
* Adding Appwrite API domains to allow XDOMAIN communication.
|
|
|
|
* Skip this check for non-web platforms which are not required to send an origin header.
|
|
|
|
*/
|
|
|
|
$origin = $request->getOrigin();
|
|
|
|
$originValidator = new Origin(\array_merge($project->getAttribute('platforms', []), $console->getAttribute('platforms', [])));
|
|
|
|
|
|
|
|
if (!$originValidator->isValid($origin) && $project->getId() !== 'console') {
|
|
|
|
throw new Exception($originValidator->getDescription(), 1008);
|
|
|
|
}
|
|
|
|
|
2021-07-01 02:04:32 +12:00
|
|
|
$roles = Auth::getRoles($user);
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2021-07-14 03:18:02 +12:00
|
|
|
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
|
2021-06-25 00:22:32 +12:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Channels Check
|
|
|
|
*/
|
|
|
|
if (empty($channels)) {
|
|
|
|
throw new Exception('Missing channels', 1008);
|
|
|
|
}
|
|
|
|
|
2022-06-20 19:37:00 +12:00
|
|
|
$realtime->subscribe($project->getId(), $connection, $roles, $channels);
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2022-07-26 00:37:29 +12:00
|
|
|
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
|
2021-08-27 21:20:49 +12:00
|
|
|
|
|
|
|
$server->send([$connection], json_encode([
|
|
|
|
'type' => 'connected',
|
|
|
|
'data' => [
|
|
|
|
'channels' => array_keys($channels),
|
|
|
|
'user' => $user
|
|
|
|
]
|
|
|
|
]));
|
2021-06-25 00:22:32 +12:00
|
|
|
|
2021-08-19 22:14:19 +12:00
|
|
|
$stats->set($project->getId(), [
|
2022-08-15 23:24:31 +12:00
|
|
|
'projectId' => $project->getId(),
|
2021-08-19 22:14:19 +12:00
|
|
|
'teamId' => $project->getAttribute('teamId')
|
|
|
|
]);
|
2021-06-25 00:22:32 +12:00
|
|
|
$stats->incr($project->getId(), 'connections');
|
|
|
|
$stats->incr($project->getId(), 'connectionsTotal');
|
|
|
|
} catch (\Throwable $th) {
|
2021-12-21 22:29:09 +13:00
|
|
|
call_user_func($logError, $th, "initServer");
|
2021-11-24 03:24:25 +13:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
$response = [
|
2021-08-27 20:20:44 +12:00
|
|
|
'type' => 'error',
|
|
|
|
'data' => [
|
|
|
|
'code' => $th->getCode(),
|
|
|
|
'message' => $th->getMessage()
|
|
|
|
]
|
2021-06-25 00:22:32 +12:00
|
|
|
];
|
2021-08-19 20:03:52 +12:00
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
$server->send([$connection], json_encode($response));
|
|
|
|
$server->close($connection, $th->getCode());
|
2021-07-13 22:20:26 +12:00
|
|
|
|
2021-08-19 20:03:52 +12:00
|
|
|
if (App::isDevelopment()) {
|
2021-08-31 02:42:31 +12:00
|
|
|
Console::error('[Error] Connection Error');
|
|
|
|
Console::error('[Error] Code: ' . $response['data']['code']);
|
|
|
|
Console::error('[Error] Message: ' . $response['data']['message']);
|
2021-08-19 20:03:52 +12:00
|
|
|
}
|
2021-06-25 00:22:32 +12:00
|
|
|
} finally {
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2022-11-14 11:42:35 +13:00
|
|
|
$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) {
|
2021-08-27 04:02:38 +12:00
|
|
|
try {
|
2021-08-27 21:20:49 +12:00
|
|
|
$response = new Response(new SwooleResponse());
|
2022-07-06 22:53:30 +12:00
|
|
|
$projectId = $realtime->connections[$connection]['projectId'];
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getConsoleDB();
|
2022-07-06 22:53:30 +12:00
|
|
|
|
|
|
|
if ($projectId !== 'console') {
|
|
|
|
$project = Authorization::skip(fn() => $database->getDocument('projects', $projectId));
|
2022-10-18 06:26:21 +13:00
|
|
|
$database = getProjectDB($project);
|
2022-07-06 22:53:30 +12:00
|
|
|
}
|
2022-06-20 19:37:00 +12:00
|
|
|
|
2021-08-27 04:02:38 +12:00
|
|
|
/*
|
|
|
|
* Abuse Check
|
|
|
|
*
|
|
|
|
* Abuse limits are sending 32 times per minute and connection.
|
|
|
|
*/
|
2021-12-02 02:19:41 +13:00
|
|
|
$timeLimit = new TimeLimit('url:{url},connection:{connection}', 32, 60, $database);
|
|
|
|
|
2021-08-27 04:02:38 +12:00
|
|
|
$timeLimit
|
|
|
|
->setParam('{connection}', $connection)
|
|
|
|
->setParam('{container}', $containerId);
|
|
|
|
|
|
|
|
$abuse = new Abuse($timeLimit);
|
|
|
|
|
|
|
|
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
|
|
|
throw new Exception('Too many messages', 1013);
|
|
|
|
}
|
|
|
|
|
|
|
|
$message = json_decode($message, true);
|
|
|
|
|
|
|
|
if (is_null($message) || (!array_key_exists('type', $message) && !array_key_exists('data', $message))) {
|
2021-08-27 04:10:53 +12:00
|
|
|
throw new Exception('Message format is not valid.', 1003);
|
2021-08-27 04:02:38 +12:00
|
|
|
}
|
|
|
|
|
|
|
|
switch ($message['type']) {
|
2022-05-09 19:35:55 +12:00
|
|
|
/**
|
2021-08-31 02:42:31 +12:00
|
|
|
* This type is used to authenticate.
|
|
|
|
*/
|
2021-08-27 04:02:38 +12:00
|
|
|
case 'authentication':
|
|
|
|
if (!array_key_exists('session', $message['data'])) {
|
2021-08-27 21:31:26 +12:00
|
|
|
throw new Exception('Payload is not valid.', 1003);
|
2021-08-27 04:02:38 +12:00
|
|
|
}
|
|
|
|
|
|
|
|
$session = Auth::decodeSession($message['data']['session']);
|
2021-10-08 04:35:17 +13:00
|
|
|
Auth::$unique = $session['id'] ?? '';
|
|
|
|
Auth::$secret = $session['secret'] ?? '';
|
2021-08-27 04:02:38 +12:00
|
|
|
|
2021-10-01 00:18:50 +13:00
|
|
|
$user = $database->getDocument('users', Auth::$unique);
|
2021-08-27 04:02:38 +12:00
|
|
|
|
|
|
|
if (
|
|
|
|
empty($user->getId()) // Check a document has been found in the DB
|
|
|
|
|| !Auth::sessionVerify($user->getAttribute('sessions', []), Auth::$secret) // Validate user has valid login token
|
|
|
|
) {
|
|
|
|
// cookie not valid
|
2021-08-27 21:31:26 +12:00
|
|
|
throw new Exception('Session is not valid.', 1003);
|
2021-08-27 04:02:38 +12:00
|
|
|
}
|
|
|
|
|
|
|
|
$roles = Auth::getRoles($user);
|
|
|
|
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
|
2022-06-20 19:37:00 +12:00
|
|
|
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels);
|
2021-08-27 04:02:38 +12:00
|
|
|
|
2022-07-26 00:37:29 +12:00
|
|
|
$user = $response->output($user, Response::MODEL_ACCOUNT);
|
2021-08-27 20:20:44 +12:00
|
|
|
$server->send([$connection], json_encode([
|
|
|
|
'type' => 'response',
|
|
|
|
'data' => [
|
|
|
|
'to' => 'authentication',
|
2021-08-27 21:20:49 +12:00
|
|
|
'success' => true,
|
|
|
|
'user' => $user
|
2021-08-27 20:20:44 +12:00
|
|
|
]
|
|
|
|
]));
|
|
|
|
|
2021-08-27 04:02:38 +12:00
|
|
|
break;
|
|
|
|
|
|
|
|
default:
|
2021-08-27 21:31:26 +12:00
|
|
|
throw new Exception('Message type is not valid.', 1003);
|
2021-08-27 04:02:38 +12:00
|
|
|
}
|
|
|
|
} catch (\Throwable $th) {
|
|
|
|
$response = [
|
2021-08-27 20:20:44 +12:00
|
|
|
'type' => 'error',
|
|
|
|
'data' => [
|
|
|
|
'code' => $th->getCode(),
|
|
|
|
'message' => $th->getMessage()
|
|
|
|
]
|
2021-08-27 04:02:38 +12:00
|
|
|
];
|
|
|
|
|
|
|
|
$server->send([$connection], json_encode($response));
|
|
|
|
|
|
|
|
if ($th->getCode() === 1008) {
|
|
|
|
$server->close($connection, $th->getCode());
|
|
|
|
}
|
|
|
|
} finally {
|
2022-11-16 20:20:08 +13:00
|
|
|
$register->get('pools')->reclaim();
|
2021-08-27 04:02:38 +12:00
|
|
|
}
|
2021-06-25 00:22:32 +12:00
|
|
|
});
|
|
|
|
|
2021-06-30 04:22:10 +12:00
|
|
|
$server->onClose(function (int $connection) use ($realtime, $stats) {
|
2021-06-29 02:34:28 +12:00
|
|
|
if (array_key_exists($connection, $realtime->connections)) {
|
|
|
|
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
|
2021-06-25 00:22:32 +12:00
|
|
|
}
|
2021-06-29 02:34:28 +12:00
|
|
|
$realtime->unsubscribe($connection);
|
|
|
|
|
2021-06-25 00:22:32 +12:00
|
|
|
Console::info('Connection close: ' . $connection);
|
|
|
|
});
|
|
|
|
|
2021-06-28 22:18:00 +12:00
|
|
|
$server->start();
|