1
0
Fork 0
mirror of synced 2024-05-06 13:52:38 +12:00

remove realtime Parser class

This commit is contained in:
Torsten Dittmann 2021-06-28 16:34:28 +02:00
parent 06674982df
commit 613d33321c
7 changed files with 244 additions and 256 deletions

View file

@ -5,8 +5,9 @@ use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Database;
use Appwrite\Event\Event;
use Appwrite\Event\Realtime as RealtimeEvent;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\Realtime\Parser;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Process;
@ -44,6 +45,8 @@ $stats->create();
$server = new Server($adapter);
$realtime = new Realtime();
$server->onStart(function (SwooleServer $server) use ($stats) {
Console::success('Server started succefully');
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
@ -83,7 +86,7 @@ $server->onStart(function (SwooleServer $server) use ($stats) {
});
});
$server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use ($server, $register, $stats, &$subscriptions, &$connections) {
$server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use ($server, $register, $stats, $realtime) {
Console::success('Worker ' . $workerId . ' started succefully');
$attempts = 0;
@ -93,22 +96,22 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
Timer::tick(5000, function () use ($server, $stats, &$subscriptions) {
if (
array_key_exists('console', $subscriptions)
&& array_key_exists('role:member', $subscriptions['console'])
&& array_key_exists('project', $subscriptions['console']['role:member'])
) {
Timer::tick(5000, function () use ($server, $stats, $realtime) {
if ($realtime->hasSubscriber('console', 'role:member', 'project')) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $value['connectionsTotal'];
}
$server->send(array_keys($subscriptions['console']['role:member']['project']), json_encode([
$event = [
'event' => 'stats.connections',
'channels' => ['project'],
'permissions' => ['role:member'],
'timestamp' => time(),
'payload' => $payload
]));
];
$server->send($realtime->getReceivers($event), json_encode($event));
}
});
@ -132,43 +135,38 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, &$connections, &$subscriptions) {
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, $stats, $register, $realtime) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$project = $event['project'];
$projectId = $event['project'];
$userId = $event['userId'];
if (array_key_exists($project, $subscriptions) && array_key_exists('user:' . $userId, $subscriptions[$project])) {
$connection = array_key_first(reset($subscriptions[$project]['user:' . $userId]));
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
} else {
return;
}
/**
* This is redundant soon and will be gone with merging the usage branch.
*/
$db = $register->get('dbPool')->get();
$cache = $register->get('redisPool')->get();
$projectDB = new Database();
$projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
$projectDB->setNamespace('app_' . $project);
$projectDB->setNamespace('app_' . $projectId);
$projectDB->setMocks(Config::getParam('collections', []));
$user = $projectDB->getDocument($userId);
Parser::setUser($user);
$roles = Auth::getRoles($user);
Parser::subscribe($project, $connection, $roles, $subscriptions, $connections, $connections[$connection]['channels']);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($cache);
}
$receivers = Parser::identifyReceivers($event, $subscriptions);
$receivers = $realtime->getReceivers($event);
// Temporarily print debug logs by default for Alpha testing.
// if (App::isDevelopment() && !empty($receivers)) {
@ -200,7 +198,7 @@ $server->onWorkerStart(function (SwooleServer $swooleServer, int $workerId) use
Console::error('Failed to restart pub/sub...');
});
$server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) use ($server, $register, $stats, &$subscriptions, &$connections) {
$server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) use ($server, $register, $stats, &$realtime) {
$app = new App('UTC');
$connection = $request->fd;
$request = new Request($request);
@ -276,10 +274,12 @@ $server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) us
throw new Exception($originValidator->getDescription(), 1008);
}
Parser::setUser($user);
$roles = [
'role:' . (($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER),
...Auth::getRoles($user)
];
$roles = Parser::getRoles();
$channels = Parser::parseChannels($request->getQuery('channels', []));
$channels = RealtimeEvent::convertChannels($request->getQuery('channels', []), $user);
/**
* Channels Check
@ -288,7 +288,7 @@ $server->onOpen(function (SwooleServer $swooleServer, SwooleRequest $request) us
throw new Exception('Missing channels', 1008);
}
Parser::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
$realtime->subscribe($project->getId(), $connection, $roles, $channels);
$server->send([$connection], json_encode($channels));
@ -322,11 +322,12 @@ $server->onMessage(function (SwooleServer $swooleServer, Frame $frame) use ($ser
$server->close($connection, 1003);
});
$server->onClose(function (SwooleServer $server, int $connection) use (&$connections, &$subscriptions, $stats) {
if (array_key_exists($connection, $connections)) {
$stats->decr($connections[$connection]['projectId'], 'connectionsTotal');
$server->onClose(function (SwooleServer $server, int $connection) use ($realtime, $stats) {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
}
Parser::unsubscribe($connection, $subscriptions, $connections);
$realtime->unsubscribe($connection);
Console::info('Connection close: ' . $connection);
});

View file

@ -280,10 +280,10 @@ class Auth
*/
public static function getRoles(Document $user): array
{
$roles = [];
if ($user->getId()) {
$roles[] = 'user:'.$user->getId();
} else {
return [];
}
foreach ($user->getAttribute('memberships', []) as $node) {

View file

@ -1,21 +0,0 @@
<?php
namespace Appwrite\Database;
use Swoole\Coroutine\Channel;
abstract class Pool
{
protected Channel $pool;
protected $available = true;
protected $size = 5;
abstract public function get();
public function destruct()
{
$this->available = false;
while (!$this->pool->isEmpty()) {
$this->pool->pop();
}
}
}

View file

@ -121,6 +121,42 @@ class Realtime
return $this->payload;
}
/**
* Converts the channels from the Query Params into an array.
* Also renames the account channel to account.USER_ID and removes all illegal account channel variations.
* @param array $channels
* @param Document $user
* @return array
*/
public static function convertChannels(array $channels, Document $user): array
{
$channels = array_flip($channels);
foreach ($channels as $key => $value) {
switch (true) {
case strpos($key, 'account.') === 0:
unset($channels[$key]);
break;
case $key === 'account':
if (!empty($user->getId())) {
$channels['account.' . $user->getId()] = $value;
}
unset($channels['account']);
break;
}
}
if (\array_key_exists('account', $channels)) {
if ($user->getId()) {
$channels['account.' . $user->getId()] = $channels['account'];
}
unset($channels['account']);
}
return $channels;
}
/**
* Populate channels array based on the event name and payload.
*

View file

@ -0,0 +1,10 @@
<?php
namespace Appwrite\Messaging;
abstract class Adapter
{
public abstract function subscribe(string $project, mixed $identifier, array $roles, array $channels): void;
public abstract function unsubscribe(mixed $identifier): void;
public abstract function send(string $projectId, string $event, array $payload): void;
}

View file

@ -0,0 +1,164 @@
<?php
namespace Appwrite\Messaging\Adapter;
use Appwrite\Event\Realtime as EventRealtime;
use Appwrite\Messaging\Adapter;
class Realtime extends Adapter
{
/**
* Connection Tree
*
* [CONNECTION_ID] ->
* 'projectId' -> [PROJECT_ID]
* 'roles' -> [ROLE_x, ROLE_Y]
* 'channels' -> [CHANNEL_NAME_X, CHANNEL_NAME_Y, CHANNEL_NAME_Z]
*/
public array $connections = [];
/**
* Subscription Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
public array $subscriptions = [];
/**
* Adds a subscribtion.
* @param string $projectId Project ID.
* @param mixed $connection Unique Identifier - Connection ID.
* @param array $roles Roles of the Subscription.
* @param array $channels Subscribed Channels.
* @return void
*/
public function subscribe(string $projectId, mixed $connection, array $roles, array $channels): void
{
if (!isset($this->subscriptions[$projectId])) { // Init Project
$this->subscriptions[$projectId] = [];
}
foreach ($roles as $role) {
if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection
$this->subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel => $list) {
$this->subscriptions[$projectId][$role][$channel][$connection] = true;
}
}
$this->connections[$connection] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels
];
}
/**
* Removes Subscription.
*
* @param mixed $connection
* @return void
*/
public function unsubscribe(mixed $connection): void
{
$projectId = $this->connections[$connection]['projectId'] ?? '';
$roles = $this->connections[$connection]['roles'] ?? [];
foreach ($roles as $role) {
foreach ($this->subscriptions[$projectId][$role] as $channel => $list) {
unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
if (empty($this->subscriptions[$projectId][$role][$channel])) {
unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
}
}
if (empty($this->subscriptions[$projectId][$role])) {
unset($this->subscriptions[$projectId][$role]); // Remove role when no channels
}
}
if (empty($this->subscriptions[$projectId])) { // Remove project when no roles
unset($this->subscriptions[$projectId]);
}
unset($this->connections[$connection]);
}
/**
* Checks if Channel has a subscriber.
* @param string $projectId
* @param string $role
* @param string $channel
* @return bool
*/
public function hasSubscriber(string $projectId, string $role, string $channel = ''): bool
{
if (empty($channel)) {
return array_key_exists($projectId, $this->subscriptions)
&& array_key_exists($role, $this->subscriptions[$projectId]);
}
return array_key_exists($projectId, $this->subscriptions)
&& array_key_exists($role, $this->subscriptions[$projectId])
&& array_key_exists($channel, $this->subscriptions[$projectId][$role]);
}
/**
* Sends an event to the Realtime Server.
* @param string $projectId
* @param string $event
* @param array $payload
* @return void
*/
public function send(string $projectId, string $event, array $payload): void
{
$realtime = new EventRealtime($projectId, $event, $payload);
$realtime->trigger();
}
/**
* Identifies the receivers of all subscriptions, based on the permissions and event.
*
* Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels:
* - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions
* - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions
* - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions
* - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions
* - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
*/
public function getReceivers(array $event)
{
$receivers = [];
if (isset($this->subscriptions[$event['project']])) {
foreach ($this->subscriptions[$event['project']] as $role => $subscription) {
foreach ($event['data']['channels'] as $channel) {
if (
\array_key_exists($channel, $this->subscriptions[$event['project']][$role])
&& (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions']))
) {
foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $ids) {
$receivers[$ids] = 0;
}
break;
}
}
}
}
return array_keys($receivers);
}
}

View file

@ -1,202 +0,0 @@
<?php
namespace Appwrite\Realtime;
use Appwrite\Auth\Auth;
use Appwrite\Database\Document;
class Parser
{
/**
* @var Document $user
*/
static $user;
/**
* Sets the current user for the role and channel parsing.
*
* @param Document $user
*/
static function setUser(Document $user)
{
self::$user = $user;
}
/**
* Returns array of roles that the set User has permissions to.
*
* @return array
*/
static function getRoles()
{
if (!isset(self::$user)) {
return [];
}
$roles = ['role:' . ((self::$user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
if (!(self::$user->isEmpty())) {
$roles[] = 'user:' . self::$user->getId();
}
foreach (self::$user->getAttribute('memberships', []) as $node) {
if (isset($node['teamId']) && isset($node['roles'])) {
$roles[] = 'team:' . $node['teamId'];
foreach ($node['roles'] as $nodeRole) { // Set all team roles
$roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole;
}
}
}
return $roles;
}
/**
* Converts the channels from the Query Params into an array.
* Also renames the account channel to account.USER_ID and removes all illegal account channel variations.
*
* @param array $channels
*/
static function parseChannels(array $channels)
{
$channels = array_flip($channels);
foreach ($channels as $key => $value) {
switch (true) {
case strpos($key, 'account.') === 0:
unset($channels[$key]);
break;
case $key === 'account':
if (!empty(self::$user->getId())) {
$channels['account.' . self::$user->getId()] = $value;
}
unset($channels['account']);
break;
}
}
if (\array_key_exists('account', $channels)) {
if (self::$user->getId()) {
$channels['account.' . self::$user->getId()] = $channels['account'];
}
unset($channels['account']);
}
return $channels;
}
/**
* Identifies the receivers of all subscriptions, based on the permissions and event.
*
* Example of performance with an event with user:XXX permissions and with X users spread across 10 different channels:
* - 0.014 ms (±6.88%) | 10 Connections / 100 Subscriptions
* - 0.070 ms (±3.71%) | 100 Connections / 1,000 Subscriptions
* - 0.846 ms (±2.74%) | 1,000 Connections / 10,000 Subscriptions
* - 10.866 ms (±1.01%) | 10,000 Connections / 100,000 Subscriptions
* - 110.201 ms (±2.32%) | 100,000 Connections / 1,000,000 Subscriptions
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
* @param array $connections
* @param array $subscriptions
*/
static function identifyReceivers(array &$event, array &$subscriptions)
{
$receivers = [];
if (isset($subscriptions[$event['project']])) {
foreach ($subscriptions[$event['project']] as $role => $subscription) {
foreach ($event['data']['channels'] as $channel) {
if (
\array_key_exists($channel, $subscriptions[$event['project']][$role])
&& (\in_array($role, $event['permissions']) || \in_array('*', $event['permissions']))
) {
foreach (array_keys($subscriptions[$event['project']][$role][$channel]) as $ids) {
$receivers[$ids] = 0;
}
break;
}
}
}
}
return array_keys($receivers);
}
/**
* Adds Subscription.
*
* @param string $projectId
* @param mixed $connection
* @param array $subscriptions
* @param array $roles
* @param array $channels
*/
static function subscribe($projectId, $connection, $roles, &$subscriptions, &$connections, &$channels)
{
/**
* Build Subscriptions Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
if (!isset($subscriptions[$projectId])) { // Init Project
$subscriptions[$projectId] = [];
}
foreach ($roles as $role) {
if (!isset($subscriptions[$projectId][$role])) { // Add user first connection
$subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel => $list) {
$subscriptions[$projectId][$role][$channel][$connection] = true;
}
}
$connections[$connection] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels
];
}
/**
* Remove Subscription.
*
* @param mixed $connection
* @param array $subscriptions
* @param array $connections
*/
static function unsubscribe($connection, &$subscriptions, &$connections)
{
$projectId = $connections[$connection]['projectId'] ?? '';
$roles = $connections[$connection]['roles'] ?? [];
foreach ($roles as $role) {
foreach ($subscriptions[$projectId][$role] as $channel => $list) {
unset($subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
if (empty($subscriptions[$projectId][$role][$channel])) {
unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
}
}
if (empty($subscriptions[$projectId][$role])) {
unset($subscriptions[$projectId][$role]); // Remove role when no channels
}
}
if (empty($subscriptions[$projectId])) { // Remove project when no roles
unset($subscriptions[$projectId]);
}
unset($connections[$connection]);
}
}