fix realtime with db refactor
This commit is contained in:
parent
42414e8841
commit
a585a9090a
|
@ -2282,7 +2282,56 @@ $collections = [
|
|||
'orders' => [Database::ORDER_DESC],
|
||||
],
|
||||
],
|
||||
]
|
||||
],
|
||||
'realtime' => [
|
||||
'$collection' => Database::METADATA,
|
||||
'$id' => 'realtime',
|
||||
'name' => 'Realtime Connections',
|
||||
'attributes' => [
|
||||
[
|
||||
'$id' => 'container',
|
||||
'type' => Database::VAR_STRING,
|
||||
'format' => '',
|
||||
'size' => Database::LENGTH_KEY,
|
||||
'signed' => true,
|
||||
'required' => true,
|
||||
'default' => null,
|
||||
'array' => false,
|
||||
'filters' => [],
|
||||
],
|
||||
[
|
||||
'$id' => 'timestamp',
|
||||
'type' => Database::VAR_INTEGER,
|
||||
'format' => '',
|
||||
'size' => 0,
|
||||
'signed' => true,
|
||||
'required' => false,
|
||||
'default' => null,
|
||||
'array' => false,
|
||||
'filters' => [],
|
||||
],
|
||||
[
|
||||
'$id' => 'value',
|
||||
'type' => Database::VAR_STRING,
|
||||
'format' => '',
|
||||
'size' => 16384,
|
||||
'signed' => true,
|
||||
'required' => true,
|
||||
'default' => null,
|
||||
'array' => false,
|
||||
'filters' => [], //TODO: use json filter
|
||||
]
|
||||
],
|
||||
'indexes' => [
|
||||
[
|
||||
'$id' => '_key_timestamp',
|
||||
'type' => Database::INDEX_KEY,
|
||||
'attributes' => ['timestamp'],
|
||||
'lengths' => [],
|
||||
'orders' => [Database::ORDER_DESC],
|
||||
],
|
||||
]
|
||||
],
|
||||
];
|
||||
|
||||
return $collections;
|
|
@ -699,10 +699,10 @@ App::post('/v1/functions/:functionId/executions')
|
|||
]));
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
|
||||
$jwt = ''; // initialize
|
||||
if (!$user->isEmpty()) { // If userId exists, generate a JWT for function
|
||||
|
||||
|
||||
$sessions = $user->getAttribute('sessions', []);
|
||||
$current = new Document();
|
||||
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
<?php
|
||||
|
||||
use Appwrite\Auth\Auth;
|
||||
use Appwrite\Database\Document;
|
||||
use Appwrite\Database\Validator\Authorization;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Utopia\App;
|
||||
use Utopia\Exception;
|
||||
use Utopia\Abuse\Abuse;
|
||||
use Utopia\Abuse\Adapters\TimeLimit;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Storage\Device\Local;
|
||||
use Utopia\Storage\Storage;
|
||||
|
||||
|
@ -209,11 +209,11 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
|
|||
$target = Realtime::fromPayload($events->getParam('event'), $payload);
|
||||
|
||||
Realtime::send(
|
||||
$project->getId(),
|
||||
$response->getPayload(),
|
||||
$events->getParam('event'),
|
||||
$target['channels'],
|
||||
$target['roles'],
|
||||
$project->getId(),
|
||||
$response->getPayload(),
|
||||
$events->getParam('event'),
|
||||
$target['channels'],
|
||||
$target['roles'],
|
||||
[
|
||||
'permissionsChanged' => $target['permissionsChanged'],
|
||||
'userId' => $events->getParam('userId')
|
||||
|
@ -221,11 +221,11 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!empty($audits->getParam('event'))) {
|
||||
$audits->trigger();
|
||||
}
|
||||
|
||||
|
||||
if (!empty($deletes->getParam('type')) && !empty($deletes->getParam('document'))) {
|
||||
$deletes->trigger();
|
||||
}
|
||||
|
@ -233,13 +233,13 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
|
|||
if (!empty($database->getParam('type')) && !empty($database->getParam('document'))) {
|
||||
$database->trigger();
|
||||
}
|
||||
|
||||
|
||||
$route = $utopia->match($request);
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled'
|
||||
&& $project->getId()
|
||||
&& $mode !== APP_MODE_ADMIN // TODO: add check to make sure user is admin
|
||||
&& !empty($route->getLabel('sdk.namespace', null))) { // Don't calculate console usage on admin mode
|
||||
|
||||
|
||||
$usage
|
||||
->setParam('networkRequestSize', $request->getSize() + $usage->getParam('storage'))
|
||||
->setParam('networkResponseSize', $response->getSize())
|
||||
|
|
|
@ -131,7 +131,6 @@ $http->on('start', function (Server $http) use ($payloadSize, $register) {
|
|||
}
|
||||
|
||||
$dbForConsole->createCollection($key, $attributes, $indexes);
|
||||
|
||||
}
|
||||
|
||||
Console::success('[Setup] - Server database init completed...');
|
||||
|
|
|
@ -28,11 +28,9 @@ use Appwrite\Event\Event;
|
|||
use Appwrite\Network\Validator\Email;
|
||||
use Appwrite\Network\Validator\IP;
|
||||
use Appwrite\Network\Validator\URL;
|
||||
use Appwrite\Event\Realtime;
|
||||
use Appwrite\OpenSSL\OpenSSL;
|
||||
use Appwrite\Stats\Stats;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\View;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Locale\Locale;
|
||||
|
|
284
app/realtime.php
284
app/realtime.php
|
@ -18,6 +18,10 @@ use Utopia\Database\Database;
|
|||
use Utopia\Cache\Adapter\Redis as RedisCache;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\Database\Adapter\MariaDB;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Swoole\Request;
|
||||
use Utopia\WebSocket\Server;
|
||||
use Utopia\WebSocket\Adapter;
|
||||
|
@ -40,124 +44,121 @@ $stats->column('messages', Table::TYPE_INT);
|
|||
$stats->create();
|
||||
|
||||
$containerId = uniqid();
|
||||
$documentId = null;
|
||||
$statsDocument = null;
|
||||
|
||||
$adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80));
|
||||
$adapter->setPackageMaxLength(64000); // Default maximum Package Size (64kb)
|
||||
|
||||
$server = new Server($adapter);
|
||||
|
||||
$server->onStart(function () use ($stats, $register, $containerId, &$documentId) {
|
||||
function getDatabase(Registry &$register, string $namespace)
|
||||
{
|
||||
$db = $register->get('dbPool')->get();
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace($namespace);
|
||||
|
||||
return [
|
||||
$database,
|
||||
function () use ($register, $db, $redis) {
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
];
|
||||
};
|
||||
|
||||
$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument) {
|
||||
Console::success('Server started succefully');
|
||||
|
||||
// $getConsoleDb = function () use ($register) {
|
||||
// $db = $register->get('dbPool')->get();
|
||||
// $cache = $register->get('redisPool')->get();
|
||||
|
||||
// $cache = new Cache(new RedisCache($cache));
|
||||
// $database = new Database(new MariaDB($db), $cache);
|
||||
|
||||
// return [
|
||||
// $database,
|
||||
// function () use ($register, $db, $cache) {
|
||||
// $register->get('dbPool')->put($db);
|
||||
// $register->get('redisPool')->put($cache);
|
||||
// }
|
||||
// ];
|
||||
// };
|
||||
|
||||
/**
|
||||
* Create document for this worker to share stats across Containers.
|
||||
*/
|
||||
// go(function () use ($getConsoleDb, $containerId, &$documentId) {
|
||||
// try {
|
||||
// [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb);
|
||||
// // $document = [
|
||||
// // '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||
// // '$permissions' => [
|
||||
// // 'read' => ['*'],
|
||||
// // 'write' => ['*'],
|
||||
// // ],
|
||||
// // 'container' => $containerId,
|
||||
// // 'timestamp' => time(),
|
||||
// // 'value' => '{}'
|
||||
// // ];
|
||||
// // Authorization::disable();
|
||||
// // $document = $consoleDb->createDocument($document);
|
||||
// // Authorization::enable();
|
||||
// // $documentId = $document->getId();
|
||||
// } catch (\Throwable $th) {
|
||||
// Console::error('[Error] Type: ' . get_class($th));
|
||||
// Console::error('[Error] Message: ' . $th->getMessage());
|
||||
// Console::error('[Error] File: ' . $th->getFile());
|
||||
// Console::error('[Error] Line: ' . $th->getLine());
|
||||
// } finally {
|
||||
// call_user_func($returnConsoleDb);
|
||||
// }
|
||||
// });
|
||||
go(function () use ($register, $containerId, &$statsDocument) {
|
||||
try {
|
||||
[$database, $returnDatabase] = getDatabase($register, 'project_console_internal');
|
||||
$document = new Document([
|
||||
'$id' => $database->getId(),
|
||||
'$collection' => 'realtime',
|
||||
'$read' => [],
|
||||
'$write' => [],
|
||||
'container' => $containerId,
|
||||
'timestamp' => time(),
|
||||
'value' => '{}'
|
||||
]);
|
||||
$statsDocument = Authorization::skip(function () use ($database, $document) {
|
||||
return $database->createDocument('realtime', $document);
|
||||
});
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('[Error] Type: ' . get_class($th));
|
||||
Console::error('[Error] Message: ' . $th->getMessage());
|
||||
Console::error('[Error] File: ' . $th->getFile());
|
||||
Console::error('[Error] Line: ' . $th->getLine());
|
||||
} finally {
|
||||
call_user_func($returnDatabase);
|
||||
}
|
||||
});
|
||||
|
||||
// /**
|
||||
// * Save current connections to the Database every 5 seconds.
|
||||
// */
|
||||
// Timer::tick(5000, function () use ($stats, $getConsoleDb, $containerId, &$documentId) {
|
||||
// foreach ($stats as $projectId => $value) {
|
||||
// if (empty($value['connections']) && empty($value['messages'])) {
|
||||
// continue;
|
||||
// }
|
||||
/**
|
||||
* Save current connections to the Database every 5 seconds.
|
||||
*/
|
||||
Timer::tick(5000, function () use ($register, $stats, $containerId, &$statsDocument) {
|
||||
/** @var Document $statsDocument */
|
||||
foreach ($stats as $projectId => $value) {
|
||||
if (empty($value['connections']) && empty($value['messages'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// $connections = $stats->get($projectId, 'connections');
|
||||
// $messages = $stats->get($projectId, 'messages');
|
||||
$connections = $stats->get($projectId, 'connections');
|
||||
$messages = $stats->get($projectId, 'messages');
|
||||
|
||||
// $usage = new Event('v1-usage', 'UsageV1');
|
||||
// $usage
|
||||
// ->setParam('projectId', $projectId)
|
||||
// ->setParam('realtimeConnections', $connections)
|
||||
// ->setParam('realtimeMessages', $messages)
|
||||
// ->setParam('networkRequestSize', 0)
|
||||
// ->setParam('networkResponseSize', 0);
|
||||
$usage = new Event('v1-usage', 'UsageV1');
|
||||
$usage
|
||||
->setParam('projectId', $projectId)
|
||||
->setParam('realtimeConnections', $connections)
|
||||
->setParam('realtimeMessages', $messages)
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
// $stats->set($projectId, [
|
||||
// 'messages' => 0,
|
||||
// 'connections' => 0
|
||||
// ]);
|
||||
$stats->set($projectId, [
|
||||
'messages' => 0,
|
||||
'connections' => 0
|
||||
]);
|
||||
|
||||
// if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
// $usage->trigger();
|
||||
// }
|
||||
// }
|
||||
// $payload = [];
|
||||
// foreach ($stats as $projectId => $value) {
|
||||
// if (!empty($value['connectionsTotal'])) {
|
||||
// $payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
||||
// }
|
||||
// }
|
||||
// if (empty($payload)) {
|
||||
// return;
|
||||
// }
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
}
|
||||
}
|
||||
$payload = [];
|
||||
foreach ($stats as $projectId => $value) {
|
||||
if (!empty($value['connectionsTotal'])) {
|
||||
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
|
||||
}
|
||||
}
|
||||
if (empty($payload) || empty($statsDocument)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// try {
|
||||
// [$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb);
|
||||
try {
|
||||
[$database, $returnDatabase] = getDatabase($register, 'project_console_internal');
|
||||
|
||||
// // $consoleDb->updateDocument([
|
||||
// // '$id' => $documentId,
|
||||
// // '$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||
// // '$permissions' => [
|
||||
// // 'read' => ['*'],
|
||||
// // 'write' => ['*'],
|
||||
// // ],
|
||||
// // 'container' => $containerId,
|
||||
// // 'timestamp' => time(),
|
||||
// // 'value' => json_encode($payload)
|
||||
// // ]);
|
||||
// } catch (\Throwable $th) {
|
||||
// Console::error('[Error] Type: ' . get_class($th));
|
||||
// Console::error('[Error] Message: ' . $th->getMessage());
|
||||
// Console::error('[Error] File: ' . $th->getFile());
|
||||
// Console::error('[Error] Line: ' . $th->getLine());
|
||||
// } finally {
|
||||
// call_user_func($returnConsoleDb);
|
||||
// }
|
||||
// });
|
||||
$statsDocument
|
||||
->setAttribute('timestamp', time())
|
||||
->setAttribute('value', json_encode($payload));
|
||||
|
||||
Authorization::skip(function () use ($database, $statsDocument) {
|
||||
$database->updateDocument('realtime', $statsDocument->getId(), $statsDocument);
|
||||
});
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('[Error] Type: ' . get_class($th));
|
||||
Console::error('[Error] Message: ' . $th->getMessage());
|
||||
Console::error('[Error] File: ' . $th->getFile());
|
||||
Console::error('[Error] Line: ' . $th->getLine());
|
||||
} finally {
|
||||
call_user_func($returnDatabase);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) {
|
||||
|
@ -171,21 +172,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
|||
* Sending current connections to project channels on the console project every 5 seconds.
|
||||
*/
|
||||
if ($realtime->hasSubscriber('console', 'role:member', 'project')) {
|
||||
$db = $register->get('dbPool')->get();
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_console_internal');
|
||||
[$database, $returnDatabase] = getDatabase($register, 'project_console_internal');
|
||||
|
||||
$payload = [];
|
||||
$list = [];
|
||||
// $list = $consoleDb->getCollection([
|
||||
// 'filters' => [
|
||||
// '$collection=' . Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||
// 'timestamp>' . (time() - 15)
|
||||
// ],
|
||||
// ]);
|
||||
|
||||
$list = Authorization::skip(function () use ($database) {
|
||||
return $database->find('realtime', [
|
||||
new Query('timestamp', Query::TYPE_GREATER, [(time() - 15)])
|
||||
]);
|
||||
});
|
||||
|
||||
/**
|
||||
* Aggregate stats across containers.
|
||||
|
@ -224,8 +220,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
|||
]));
|
||||
}
|
||||
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($redis);
|
||||
call_user_func($returnDatabase);
|
||||
}
|
||||
/**
|
||||
* Sending test message for SDK E2E tests every 5 seconds.
|
||||
|
@ -284,12 +279,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
|||
return;
|
||||
}
|
||||
|
||||
$db = $register->get('dbPool')->get();
|
||||
$cache = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($cache));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_' . $projectId .'_internal');
|
||||
[$database, $returnDatabase] = getDatabase($register, 'project_' . $projectId . '_internal');
|
||||
|
||||
$user = $database->getDocument('users', $userId);
|
||||
|
||||
|
@ -297,8 +287,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
|||
|
||||
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
|
||||
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($cache);
|
||||
call_user_func($returnDatabase);
|
||||
}
|
||||
|
||||
$receivers = $realtime->getSubscribers($event);
|
||||
|
@ -374,7 +363,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_' . $project->getId() .'_internal');
|
||||
$database->setNamespace('project_' . $project->getId() . '_internal');
|
||||
|
||||
/*
|
||||
* Project Check
|
||||
|
@ -388,17 +377,16 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
*
|
||||
* Abuse limits are connecting 128 times per minute and ip address.
|
||||
*/
|
||||
// $timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, $database);
|
||||
// $timeLimit
|
||||
// ->setParam('{ip}', $request->getIP())
|
||||
// ->setParam('{url}', $request->getURI())
|
||||
// ->setup();
|
||||
$timeLimit = new TimeLimit('url:{url},ip:{ip}', 128, 60, $database);
|
||||
$timeLimit
|
||||
->setParam('{ip}', $request->getIP())
|
||||
->setParam('{url}', $request->getURI());
|
||||
|
||||
// $abuse = new Abuse($timeLimit);
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
// if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
// throw new Exception('Too many requests', 1013);
|
||||
// }
|
||||
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
throw new Exception('Too many requests', 1013);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate Client Domain - Check to avoid CSRF attack.
|
||||
|
@ -457,7 +445,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
|
|||
Console::error('[Error] Connection Error');
|
||||
Console::error('[Error] Code: ' . $response['data']['code']);
|
||||
Console::error('[Error] Message: ' . $response['data']['message']);
|
||||
var_dump($th->getFile(), $th->getLine(), $th->getTraceAsString());
|
||||
}
|
||||
|
||||
if ($th instanceof PDOException) {
|
||||
|
@ -476,28 +463,27 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
try {
|
||||
$response = new Response(new SwooleResponse());
|
||||
$db = $register->get('dbPool')->get();
|
||||
$cache = $register->get('redisPool')->get();
|
||||
$redis = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($cache));
|
||||
$cache = new Cache(new RedisCache($redis));
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_' . $realtime->connections[$connection]['projectId'] .'_internal');
|
||||
$database->setNamespace('project_' . $realtime->connections[$connection]['projectId'] . '_internal');
|
||||
|
||||
/*
|
||||
* Abuse Check
|
||||
*
|
||||
* Abuse limits are sending 32 times per minute and connection.
|
||||
*/
|
||||
// $timeLimit = new TimeLimit('url:{url},conection:{connection}', 32, 60, $database);
|
||||
// $timeLimit
|
||||
// ->setParam('{connection}', $connection)
|
||||
// ->setParam('{container}', $containerId)
|
||||
// ->setup();
|
||||
$timeLimit = new TimeLimit('url:{url},conection:{connection}', 32, 60, $database);
|
||||
$timeLimit
|
||||
->setParam('{connection}', $connection)
|
||||
->setParam('{container}', $containerId);
|
||||
|
||||
// $abuse = new Abuse($timeLimit);
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
// if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
// throw new Exception('Too many messages', 1013);
|
||||
// }
|
||||
if ($abuse->check() && App::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
|
||||
throw new Exception('Too many messages', 1013);
|
||||
}
|
||||
|
||||
$message = json_decode($message, true);
|
||||
|
||||
|
@ -506,7 +492,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
}
|
||||
|
||||
switch ($message['type']) {
|
||||
/**
|
||||
/**
|
||||
* This type is used to authenticate.
|
||||
*/
|
||||
case 'authentication':
|
||||
|
@ -515,8 +501,8 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
}
|
||||
|
||||
$session = Auth::decodeSession($message['data']['session']);
|
||||
Auth::$unique = $session['id'];
|
||||
Auth::$secret = $session['secret'];
|
||||
Auth::$unique = $session['id'] ?? '';
|
||||
Auth::$secret = $session['secret'] ?? '';
|
||||
|
||||
$user = $database->getDocument('users', Auth::$unique);
|
||||
|
||||
|
@ -564,7 +550,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
}
|
||||
} finally {
|
||||
$register->get('dbPool')->put($db);
|
||||
$register->get('redisPool')->put($cache);
|
||||
$register->get('redisPool')->put($redis);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ class Realtime extends Adapter
|
|||
*/
|
||||
if (
|
||||
\array_key_exists($channel, $this->subscriptions[$event['project']][$role])
|
||||
&& (\in_array($role, $event['roles']) || \in_array('*', $event['roles']))
|
||||
&& (\in_array($role, $event['roles']) || \in_array('role:all', $event['roles']))
|
||||
) {
|
||||
/**
|
||||
* Saving all connections that are allowed to receive this event.
|
||||
|
@ -277,28 +277,29 @@ class Realtime extends Adapter
|
|||
case strpos($event, 'database.collections.') === 0:
|
||||
$channels[] = 'collections';
|
||||
$channels[] = 'collections.' . $payload->getId();
|
||||
$roles = $payload->getAttribute('$permissions.read');
|
||||
$roles = $payload->getRead();
|
||||
|
||||
break;
|
||||
case strpos($event, 'database.documents.') === 0:
|
||||
$channels[] = 'documents';
|
||||
$channels[] = 'collections.' . $payload->getAttribute('$collection') . '.documents';
|
||||
$channels[] = 'documents.' . $payload->getId();
|
||||
$roles = $payload->getAttribute('$permissions.read');
|
||||
$roles = $payload->getRead();
|
||||
|
||||
break;
|
||||
case strpos($event, 'storage.') === 0:
|
||||
$channels[] = 'files';
|
||||
$channels[] = 'files.' . $payload->getId();
|
||||
$roles = $payload->getAttribute('$permissions.read');
|
||||
$roles = $payload->getRead();
|
||||
|
||||
break;
|
||||
case strpos($event, 'functions.executions.') === 0:
|
||||
if (!empty($payload->getAttribute('$permissions.read'))) {
|
||||
\var_dump($payload->getArrayCopy());
|
||||
if (!empty($payload->getRead())) {
|
||||
$channels[] = 'executions';
|
||||
$channels[] = 'executions.' . $payload->getId();
|
||||
$channels[] = 'functions.' . $payload->getAttribute('functionId');
|
||||
$roles = $payload->getAttribute('$permissions.read');
|
||||
$roles = $payload->getRead();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -16,12 +16,12 @@ class Execution extends Model
|
|||
'default' => '',
|
||||
'example' => '5e5ea5c16897e',
|
||||
])
|
||||
->addRule('$permissions', [
|
||||
'type' => Response::MODEL_PERMISSIONS,
|
||||
'description' => 'Execution permissions.',
|
||||
'default' => new \stdClass,
|
||||
'example' => new \stdClass,
|
||||
'array' => false,
|
||||
->addRule('$read', [
|
||||
'type' => self::TYPE_STRING,
|
||||
'description' => 'Execution read permissions.',
|
||||
'default' => '',
|
||||
'example' => 'role:all',
|
||||
'array' => true,
|
||||
])
|
||||
->addRule('functionId', [
|
||||
'type' => self::TYPE_STRING,
|
||||
|
|
|
@ -190,7 +190,6 @@ trait RealtimeBase
|
|||
$user = $this->getUser();
|
||||
$userId = $user['$id'] ?? '';
|
||||
$session = $user['session'] ?? '';
|
||||
$projectId = $this->getProject()['$id'];
|
||||
|
||||
/**
|
||||
* Test for SUCCESS
|
||||
|
@ -617,27 +616,11 @@ trait RealtimeBase
|
|||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
'x-appwrite-key' => $this->getProject()['apiKey']
|
||||
]), [
|
||||
'collectionId' => 'unique()',
|
||||
'name' => 'Actors',
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'rules' => [
|
||||
[
|
||||
'label' => 'First Name',
|
||||
'key' => 'firstName',
|
||||
'type' => 'text',
|
||||
'default' => '',
|
||||
'required' => true,
|
||||
'array' => false
|
||||
],
|
||||
[
|
||||
'label' => 'Last Name',
|
||||
'key' => 'lastName',
|
||||
'type' => 'text',
|
||||
'default' => '',
|
||||
'required' => true,
|
||||
'array' => false
|
||||
],
|
||||
],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
'permission' => 'collection'
|
||||
]);
|
||||
|
||||
$response = json_decode($client->receive(), true);
|
||||
|
@ -655,6 +638,24 @@ trait RealtimeBase
|
|||
|
||||
$data = ['actorsId' => $actors['body']['$id']];
|
||||
|
||||
$name = $this->client->call(Client::METHOD_POST, '/database/collections/' . $data['actorsId'] . '/attributes/string', array_merge([
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
'x-appwrite-key' => $this->getProject()['apiKey']
|
||||
]), [
|
||||
'attributeId' => 'name',
|
||||
'size' => 256,
|
||||
'required' => true,
|
||||
]);
|
||||
|
||||
$this->assertEquals($name['headers']['status-code'], 201);
|
||||
$this->assertEquals($name['body']['key'], 'name');
|
||||
$this->assertEquals($name['body']['type'], 'string');
|
||||
$this->assertEquals($name['body']['size'], 256);
|
||||
$this->assertEquals($name['body']['required'], true);
|
||||
|
||||
sleep(2);
|
||||
|
||||
/**
|
||||
* Test Document Create
|
||||
*/
|
||||
|
@ -662,12 +663,12 @@ trait RealtimeBase
|
|||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), [
|
||||
'documentId' => 'unique()',
|
||||
'data' => [
|
||||
'firstName' => 'Chris',
|
||||
'lastName' => 'Evans',
|
||||
'name' => 'Chris Evans'
|
||||
],
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
]);
|
||||
|
||||
$response = json_decode($client->receive(), true);
|
||||
|
@ -683,6 +684,7 @@ trait RealtimeBase
|
|||
$this->assertContains('collections.' . $actors['body']['$id'] . '.documents', $response['data']['channels']);
|
||||
$this->assertEquals('database.documents.create', $response['data']['event']);
|
||||
$this->assertNotEmpty($response['data']['payload']);
|
||||
$this->assertEquals($response['data']['payload']['name'], 'Chris Evans');
|
||||
|
||||
$data['documentId'] = $document['body']['$id'];
|
||||
|
||||
|
@ -693,12 +695,12 @@ trait RealtimeBase
|
|||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), [
|
||||
'documentId' => 'unique()',
|
||||
'data' => [
|
||||
'firstName' => 'Chris1',
|
||||
'lastName' => 'Evans2',
|
||||
'name' => 'Chris Evans 2'
|
||||
],
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
]);
|
||||
|
||||
$response = json_decode($client->receive(), true);
|
||||
|
@ -715,8 +717,8 @@ trait RealtimeBase
|
|||
$this->assertEquals('database.documents.update', $response['data']['event']);
|
||||
$this->assertNotEmpty($response['data']['payload']);
|
||||
|
||||
$this->assertEquals($response['data']['payload']['firstName'], 'Chris1');
|
||||
$this->assertEquals($response['data']['payload']['lastName'], 'Evans2');
|
||||
$this->assertEquals($response['data']['payload']['name'], 'Chris Evans 2');
|
||||
|
||||
|
||||
/**
|
||||
* Test Document Delete
|
||||
|
@ -725,13 +727,12 @@ trait RealtimeBase
|
|||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), [
|
||||
'documentId' => 'unique()',
|
||||
'data' => [
|
||||
'firstName' => 'Bradly',
|
||||
'lastName' => 'Cooper',
|
||||
|
||||
'name' => 'Bradley Cooper'
|
||||
],
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
]);
|
||||
|
||||
$client->receive();
|
||||
|
@ -754,6 +755,7 @@ trait RealtimeBase
|
|||
$this->assertContains('collections.' . $data['actorsId'] . '.documents', $response['data']['channels']);
|
||||
$this->assertEquals('database.documents.delete', $response['data']['event']);
|
||||
$this->assertNotEmpty($response['data']['payload']);
|
||||
$this->assertEquals($response['data']['payload']['name'], 'Bradley Cooper');
|
||||
|
||||
$client->close();
|
||||
}
|
||||
|
@ -786,9 +788,10 @@ trait RealtimeBase
|
|||
'content-type' => 'multipart/form-data',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), [
|
||||
'fileId' => 'unique()',
|
||||
'file' => new CURLFile(realpath(__DIR__ . '/../../../resources/logo.png'), 'image/png', 'logo.png'),
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
'folderId' => 'xyz',
|
||||
]);
|
||||
|
||||
|
@ -814,8 +817,8 @@ trait RealtimeBase
|
|||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
], $this->getHeaders()), [
|
||||
'read' => ['*'],
|
||||
'write' => ['*'],
|
||||
'read' => ['role:all'],
|
||||
'write' => ['role:all'],
|
||||
]);
|
||||
|
||||
$response = json_decode($client->receive(), true);
|
||||
|
@ -878,16 +881,17 @@ trait RealtimeBase
|
|||
$this->assertEquals($user['$id'], $response['data']['user']['$id']);
|
||||
|
||||
/**
|
||||
* Test File Create
|
||||
* Test Functions Create
|
||||
*/
|
||||
$function = $this->client->call(Client::METHOD_POST, '/functions', array_merge([
|
||||
$function = $this->client->call(Client::METHOD_POST, '/functions', [
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $this->getProject()['$id'],
|
||||
'x-appwrite-key' => $this->getProject()['apiKey']
|
||||
]), [
|
||||
], [
|
||||
'functionId' => 'unique()',
|
||||
'name' => 'Test',
|
||||
'execute' => ['role:member'],
|
||||
'runtime' => 'php-8.0',
|
||||
'execute' => ['*'],
|
||||
'timeout' => 10,
|
||||
]);
|
||||
|
||||
|
@ -988,6 +992,7 @@ trait RealtimeBase
|
|||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $projectId,
|
||||
], $this->getHeaders()), [
|
||||
'teamId' => 'unique()',
|
||||
'name' => 'Arsenal'
|
||||
]);
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
namespace Appwrite\Tests;
|
||||
|
||||
use Appwrite\Auth\Auth;
|
||||
use Appwrite\Database\Document;
|
||||
use Utopia\Database\Document;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
|
||||
|
@ -146,14 +146,14 @@ class MessagingChannelsTest extends TestCase
|
|||
}
|
||||
|
||||
/**
|
||||
* Tests Wildcard (*) Permissions on every channel.
|
||||
* Tests Wildcard (role:all) Permissions on every channel.
|
||||
*/
|
||||
public function testWildcardPermission()
|
||||
{
|
||||
foreach ($this->allChannels as $index => $channel) {
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => ['*'],
|
||||
'roles' => ['role:all'],
|
||||
'data' => [
|
||||
'channels' => [
|
||||
0 => $channel,
|
||||
|
|
|
@ -20,7 +20,7 @@ class MessagingGuestTest extends TestCase
|
|||
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => ['*'],
|
||||
'roles' => ['role:all'],
|
||||
'data' => [
|
||||
'channels' => [
|
||||
0 => 'documents',
|
||||
|
@ -95,7 +95,7 @@ class MessagingGuestTest extends TestCase
|
|||
|
||||
$this->assertEmpty($receivers);
|
||||
|
||||
$event['roles'] = ['*'];
|
||||
$event['roles'] = ['role:all'];
|
||||
$event['data']['channels'] = ['documents.123'];
|
||||
|
||||
$receivers = $realtime->getSubscribers($event);
|
||||
|
|
|
@ -29,7 +29,7 @@ class MessagingTest extends TestCase
|
|||
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => ['*'],
|
||||
'roles' => ['role:all'],
|
||||
'data' => [
|
||||
'channels' => [
|
||||
0 => 'account.123',
|
||||
|
@ -103,7 +103,7 @@ class MessagingTest extends TestCase
|
|||
|
||||
$this->assertEmpty($receivers);
|
||||
|
||||
$event['roles'] = ['*'];
|
||||
$event['roles'] = ['role:all'];
|
||||
$event['data']['channels'] = ['documents.123'];
|
||||
|
||||
$receivers = $realtime->getSubscribers($event);
|
||||
|
|
Loading…
Reference in a new issue