add realtime prototype
This commit is contained in:
parent
f4f0f32468
commit
46f64b4faa
|
@ -143,6 +143,7 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
|
|||
|
||||
$realtime
|
||||
->setEvent($events->getParam('event'))
|
||||
->setProject($project->getId())
|
||||
->setPayload($response->getPayload())
|
||||
->trigger();
|
||||
|
||||
|
|
|
@ -323,7 +323,7 @@ App::setResource('events', function($register) {
|
|||
}, ['register']);
|
||||
|
||||
App::setResource('realtime', function($register) {
|
||||
return new Realtime('', []);
|
||||
return new Realtime('', '', []);
|
||||
}, ['register']);
|
||||
|
||||
App::setResource('audits', function($register) {
|
||||
|
|
225
app/realtime.php
225
app/realtime.php
|
@ -1,18 +1,29 @@
|
|||
<?php
|
||||
|
||||
require_once __DIR__.'/init.php';
|
||||
require_once __DIR__.'/../vendor/autoload.php';
|
||||
require_once __DIR__ . '/init.php';
|
||||
require_once __DIR__ . '/../vendor/autoload.php';
|
||||
|
||||
use Appwrite\Auth\Auth;
|
||||
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
||||
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
||||
use Appwrite\Database\Database;
|
||||
use Appwrite\Database\Document;
|
||||
use Appwrite\Database\Validator\Authorization;
|
||||
use Appwrite\Extend\PDO;
|
||||
use Swoole\WebSocket\Server;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Process;
|
||||
use Swoole\WebSocket\Frame;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Swoole\Request as SwooleRequest;
|
||||
|
||||
use PDO as PDONative;
|
||||
use Swoole\Database\RedisConfig;
|
||||
use Swoole\Database\RedisPool;
|
||||
|
||||
/**
|
||||
* TODO List
|
||||
*
|
||||
|
@ -33,38 +44,77 @@ ini_set('default_socket_timeout', -1);
|
|||
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
|
||||
$server = new Server("0.0.0.0", 80);
|
||||
$server->set([
|
||||
'worker_num' => 1
|
||||
]);
|
||||
$subscriptions = [];
|
||||
$connections = [];
|
||||
|
||||
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections) {
|
||||
Console::success('Worker '.++$workerId.' started succefully');
|
||||
$register = new Registry();
|
||||
|
||||
$register->set('db', function () { // Register DB connection
|
||||
$dbHost = App::getEnv('_APP_DB_HOST', '');
|
||||
$dbUser = App::getEnv('_APP_DB_USER', '');
|
||||
$dbPass = App::getEnv('_APP_DB_PASS', '');
|
||||
$dbScheme = App::getEnv('_APP_DB_SCHEMA', '');
|
||||
|
||||
$pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array(
|
||||
PDONative::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
|
||||
PDONative::ATTR_TIMEOUT => 3, // Seconds
|
||||
PDONative::ATTR_PERSISTENT => true
|
||||
));
|
||||
|
||||
// Connection settings
|
||||
$pdo->setAttribute(PDONative::ATTR_DEFAULT_FETCH_MODE, PDONative::FETCH_ASSOC); // Return arrays
|
||||
$pdo->setAttribute(PDONative::ATTR_ERRMODE, PDONative::ERRMODE_EXCEPTION); // Handle all errors with exceptions
|
||||
|
||||
return $pdo;
|
||||
});
|
||||
|
||||
$register->set('cache', function () { // Register cache connection
|
||||
$redis = new Redis();
|
||||
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
|
||||
$user = App::getEnv('_APP_REDIS_USER','');
|
||||
$pass = App::getEnv('_APP_REDIS_PASS','');
|
||||
$auth = [];
|
||||
if(!empty($user)) {
|
||||
$auth["user"] = $user;
|
||||
}
|
||||
if(!empty($pass)) {
|
||||
$auth["pass"] = $pass;
|
||||
}
|
||||
if(!empty($auth)) {
|
||||
$redis->auth($auth);
|
||||
}
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
return $redis;
|
||||
});
|
||||
|
||||
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
|
||||
Console::success('Worker ' . ++$workerId . ' started succefully');
|
||||
|
||||
$attempts = 0;
|
||||
$start = time();
|
||||
|
||||
|
||||
while ($attempts < 300) {
|
||||
try {
|
||||
if($attempts > 0) {
|
||||
Console::error('Pub/sub connection lost (lasted '.(time() - $start).' seconds, worker: '.$workerId.').
|
||||
Attempting restart in 5 seconds (attempt #'.$attempts.')');
|
||||
if ($attempts > 0) {
|
||||
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
|
||||
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
|
||||
sleep(5); // 1 sec delay between connection attempts
|
||||
}
|
||||
|
||||
$redis = new Redis();
|
||||
$redis->connect('redis', 6379);
|
||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||
$redis = $register->get('cache');
|
||||
|
||||
if($redis->ping(true)) {
|
||||
if ($redis->ping(true)) {
|
||||
$attempts = 0;
|
||||
Console::success('Pub/sub connection established (worker: '.$workerId.')');
|
||||
}
|
||||
else {
|
||||
Console::error('Pub/sub failed (worker: '.$workerId.')');
|
||||
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
|
||||
} else {
|
||||
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
|
||||
}
|
||||
|
||||
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) {
|
||||
$message = 'Message from worker #'.$workerId.'; '.$message;
|
||||
|
||||
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$connections, &$subscriptions) {
|
||||
// TODO get project and resource ID and itterate over the resource read(?) permissions and send a message to all listeners
|
||||
|
||||
/**
|
||||
|
@ -80,23 +130,43 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &
|
|||
* - Function
|
||||
* - Execution
|
||||
*/
|
||||
|
||||
foreach($connections as $fd => $connection) {
|
||||
if ($server->exist($fd)
|
||||
&& $server->isEstablished($fd)
|
||||
$event = json_decode($payload);
|
||||
|
||||
$receivers = [];
|
||||
|
||||
foreach ($connections as $fd => $connection) {
|
||||
if ($connection['projectId'] !== $event->project) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($connection['roles'] as $role) {
|
||||
if (\array_key_exists($role, $subscriptions[$event->project])) {
|
||||
foreach ($event->channels as $channel) {
|
||||
if (\array_key_exists($channel, $subscriptions[$event->project][$role]) && \in_array($role, $event->permissions)) {
|
||||
$receivers = array_merge($receivers, array_keys($subscriptions[$event->project][$role][$channel]));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$receivers = array_keys(array_flip($receivers));
|
||||
|
||||
foreach ($receivers as $receiver) {
|
||||
if ($server->exist($receiver)
|
||||
&& $server->isEstablished($receiver)
|
||||
) {
|
||||
Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
|
||||
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
$server->push($receiver, json_encode($event->data), SWOOLE_WEBSOCKET_OPCODE_TEXT,
|
||||
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
|
||||
}
|
||||
else {
|
||||
$server->close($fd);
|
||||
$server->close($receiver);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('Pub/sub error: '.$th->getMessage());
|
||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||
$attempts++;
|
||||
continue;
|
||||
}
|
||||
|
@ -119,7 +189,7 @@ $server->on("start", function (Server $server) {
|
|||
});
|
||||
});
|
||||
|
||||
$server->on('open', function(Server $server, Request $request) use (&$connections, &$subscriptions) {
|
||||
$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) {
|
||||
Console::info("Connection open (user: {$request->fd}, worker: {$server->getWorkerId()})");
|
||||
|
||||
$app = new App('');
|
||||
|
@ -130,33 +200,74 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
|
|||
return $request;
|
||||
});
|
||||
|
||||
App::setResource('response', function () {
|
||||
return null;
|
||||
});
|
||||
App::setResource('consoleDB', function () use (&$register) {
|
||||
$consoleDB = new Database();
|
||||
$consoleDB->setAdapter(new MySQLAdapter($register)); // TODO: Add Redis
|
||||
$consoleDB->setNamespace('app_console'); // Should be replaced with param if we want to have parent projects
|
||||
$consoleDB->setMocks(Config::getParam('collections', []));
|
||||
|
||||
App::setResource('project', function () { // TODO get project from query string
|
||||
return new Document();
|
||||
});
|
||||
return $consoleDB;
|
||||
}, ['register']);
|
||||
|
||||
App::setResource('user', function () { // TODO get user with JWT token
|
||||
return new Document();
|
||||
});
|
||||
App::setResource('project', function ($consoleDB, $request) {
|
||||
/** @var Utopia\Swoole\Request $request */
|
||||
/** @var Appwrite\Database\Database $consoleDB */
|
||||
|
||||
Authorization::disable();
|
||||
|
||||
$project = $consoleDB->getDocument($request->getQuery('project'));
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
return $project;
|
||||
}, ['consoleDB', 'request']);
|
||||
|
||||
App::setResource('user', function ($project, $request, $projectDB) {
|
||||
/** @var Utopia\Swoole\Request $request */
|
||||
/** @var Appwrite\Database\Document $project */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
Authorization::setDefaultStatus(true);
|
||||
|
||||
Auth::setCookieName('a_session_' . $project->getId());
|
||||
|
||||
$session = Auth::decodeSession(
|
||||
$request->getCookie(
|
||||
Auth::$cookieName, // Get sessions
|
||||
$request->getCookie(Auth::$cookieName . '_legacy', '')
|
||||
)
|
||||
); // Get fallback session from old clients (no SameSite support)
|
||||
|
||||
Auth::$unique = $session['id'];
|
||||
Auth::$secret = $session['secret'];
|
||||
|
||||
$user = $projectDB->getDocument(Auth::$unique);
|
||||
|
||||
if (
|
||||
empty($user->getId()) // Check a document has been found in the DB
|
||||
|| Database::SYSTEM_COLLECTION_USERS !== $user->getCollection() // Validate returned document is really a user document
|
||||
|| !Auth::tokenVerify($user->getAttribute('tokens', []), Auth::TOKEN_TYPE_LOGIN, Auth::$secret)
|
||||
) { // Validate user has valid login token
|
||||
$user = new Document(['$id' => '', '$collection' => Database::SYSTEM_COLLECTION_USERS]);
|
||||
}
|
||||
|
||||
return $user;
|
||||
}, ['project', 'request', 'projectDB']);
|
||||
|
||||
$channels = array_flip($request->getQuery('channels', []));
|
||||
$jwt = $request->getQuery('jwt', '');
|
||||
$user = $app->getResource('user');
|
||||
$project = $app->getResource('project');
|
||||
$roles = ['*', 'user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
|
||||
$roles = ['*', 'user:' . $user->getId(), 'role:' . (($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
|
||||
|
||||
/** @var Appwrite\Database\Document $user */
|
||||
/** @var Appwrite\Database\Document $project */
|
||||
|
||||
\array_map(function ($node) use (&$roles) {
|
||||
if (isset($node['teamId']) && isset($node['roles'])) {
|
||||
$roles[] = 'team:'.$node['teamId'];
|
||||
$roles[] = 'team:' . $node['teamId'];
|
||||
|
||||
foreach ($node['roles'] as $nodeRole) { // Set all team roles
|
||||
$roles[] = 'team:'.$node['teamId'].'/'.$nodeRole;
|
||||
$roles[] = 'team:' . $node['teamId'] . '/' . $nodeRole;
|
||||
}
|
||||
}
|
||||
}, $user->getAttribute('memberships', []));
|
||||
|
@ -175,15 +286,15 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
|
|||
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
|
||||
*/
|
||||
|
||||
if(!isset($subscriptions[$project->getId()])) { // Init Project
|
||||
if (!isset($subscriptions[$project->getId()])) { // Init Project
|
||||
$subscriptions[$project->getId()] = [];
|
||||
}
|
||||
|
||||
foreach ($roles as $key => $role) {
|
||||
if(!isset($subscriptions[$project->getId()][$role])) { // Add user first connection
|
||||
if (!isset($subscriptions[$project->getId()][$role])) { // Add user first connection
|
||||
$subscriptions[$project->getId()][$role] = [];
|
||||
}
|
||||
|
||||
|
||||
foreach ($channels as $channel => $list) {
|
||||
$subscriptions[$project->getId()][$role][$channel][$connection] = true;
|
||||
}
|
||||
|
@ -194,25 +305,18 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
|
|||
'roles' => $roles,
|
||||
];
|
||||
|
||||
var_dump($project->getId());
|
||||
var_dump($project->getAttribute('name'));
|
||||
var_dump($user->getId());
|
||||
var_dump($user->getAttribute('name'));
|
||||
|
||||
$server->push($connection, json_encode($subscriptions));
|
||||
});
|
||||
|
||||
$server->on('message', function(Server $server, Frame $frame) {
|
||||
if($frame->data === 'reload') {
|
||||
$server->on('message', function (Server $server, Frame $frame) use (&$connections, &$subscriptions) {
|
||||
if ($frame->data === 'reload') {
|
||||
$server->reload();
|
||||
}
|
||||
|
||||
Console::info('Recieved message: '.$frame->data.' (user: '.$frame->fd.', worker: '.$server->getWorkerId().')');
|
||||
|
||||
$server->push($frame->fd, json_encode(["hello, worker_id:".$server->getWorkerId(), time()]));
|
||||
Console::info('Recieved message: ' . $frame->data . ' (user: ' . $frame->fd . ', worker: ' . $server->getWorkerId() . ')');
|
||||
});
|
||||
|
||||
$server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) {
|
||||
$server->on('close', function (Server $server, int $fd) use (&$connections, &$subscriptions) {
|
||||
$projectId = $connections[$fd]['projectId'] ?? '';
|
||||
$roles = $connections[$fd]['roles'] ?? [];
|
||||
|
||||
|
@ -220,25 +324,24 @@ $server->on('close', function(Server $server, int $fd) use (&$connections, &$sub
|
|||
foreach ($subscriptions[$projectId][$role] as $channel => $list) {
|
||||
unset($subscriptions[$projectId][$role][$channel][$fd]); // Remove connection
|
||||
|
||||
if(empty($subscriptions[$projectId][$role][$channel])) {
|
||||
if (empty($subscriptions[$projectId][$role][$channel])) {
|
||||
unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
|
||||
}
|
||||
}
|
||||
|
||||
if(empty($subscriptions[$projectId][$role])) {
|
||||
if (empty($subscriptions[$projectId][$role])) {
|
||||
unset($subscriptions[$projectId][$role]); // Remove role when no channels
|
||||
}
|
||||
}
|
||||
|
||||
if(empty($subscriptions[$projectId])) { // Remove project when no roles
|
||||
if (empty($subscriptions[$projectId])) { // Remove project when no roles
|
||||
unset($subscriptions[$projectId]);
|
||||
}
|
||||
|
||||
unset($connections[$fd]);
|
||||
|
||||
Console::info('Connection close: '.$fd);
|
||||
Console::info('Connection close: ' . $fd);
|
||||
|
||||
var_dump($subscriptions);
|
||||
});
|
||||
|
||||
$server->start();
|
|
@ -7,6 +7,11 @@ use Utopia\App;
|
|||
|
||||
class Realtime
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $project = '';
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
|
@ -26,15 +31,35 @@ class Realtime
|
|||
/**
|
||||
* Event constructor.
|
||||
*
|
||||
* @param string $project
|
||||
* @param string $event
|
||||
* @param array $payload
|
||||
*/
|
||||
public function __construct(string $event, array $payload)
|
||||
public function __construct(string $project, string $event, array $payload)
|
||||
{
|
||||
$this->project = $project;
|
||||
$this->event = $event;
|
||||
$this->payload = new Document($payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $project
|
||||
* return $this
|
||||
*/
|
||||
public function setProject(string $project): self
|
||||
{
|
||||
$this->project = $project;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getProject(): string
|
||||
{
|
||||
return $this->project;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $event
|
||||
* return $this
|
||||
|
@ -115,11 +140,13 @@ class Realtime
|
|||
$redis = new \Redis();
|
||||
$redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
|
||||
$redis->publish('realtime', json_encode([
|
||||
'project' => $this->project,
|
||||
'channels' => $this->channels,
|
||||
'permissions' => $this->payload->getAttribute('$permissions.read'),
|
||||
'data' => [
|
||||
'event' => $this->event,
|
||||
'timestamp' => time(),
|
||||
'payload' => $this->payload
|
||||
'payload' => $this->payload->getArrayCopy()
|
||||
]
|
||||
]));
|
||||
|
||||
|
|
Loading…
Reference in a new issue