1
0
Fork 0
mirror of synced 2024-05-20 12:42:39 +12:00

Fixed code to work with 0.7 changes

This commit is contained in:
Eldad Fux 2021-02-21 23:22:32 +02:00
parent 23ed7284fa
commit 780841853e
3 changed files with 74 additions and 28 deletions

View file

@ -37,6 +37,7 @@ App::get('/v1/health/realtime')
->desc('Get Realtime')
->groups(['api', 'health'])
->label('scope', 'public')
->inject('response')
->action(function ($response) {
/** @var Utopia\Response $response */
$redis = new Redis();
@ -44,7 +45,7 @@ App::get('/v1/health/realtime')
$redis->publish('realtime', 'I\'m a live message');
$response->json(['status' => 'OK']);
}, ['response']);
});
App::get('/v1/health/db')
->desc('Get DB')

View file

@ -4,14 +4,14 @@ require_once __DIR__.'/init.php';
require_once __DIR__.'/../vendor/autoload.php';
use Appwrite\Auth\Auth;
use Appwrite\Swoole\Request as SwooleRequest;
use Appwrite\Database\Document;
use Swoole\WebSocket\Server;
use Swoole\Http\Request;
use Swoole\Process;
use Swoole\WebSocket\Frame;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Route;
use Utopia\Swoole\Request as SwooleRequest;
/**
* TODO List
@ -36,7 +36,7 @@ $server = new Server("0.0.0.0", 80);
$subscriptions = [];
$connections = [];
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions) {
$server->on("workerStart", function ($server, $workerId) use (&$subscriptions, &$connections) {
Console::success('Worker '.++$workerId.' started succefully');
$attempts = 0;
@ -62,22 +62,37 @@ $server->on("workerStart", function ($server, $workerId) use (&$subscriptions) {
Console::error('Pub/sub failed (worker: '.$workerId.')');
}
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId) {
$redis->subscribe(['realtime'], function($redis, $channel, $message) use ($server, $workerId, &$connections) {
$message = 'Message from worker #'.$workerId.'; '.$message;
// foreach($connections as $fd) {
// if ($server->exist($fd)
// && $server->isEstablished($fd)
// ) {
// Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
// $server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
// SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
// }
// else {
// $server->close($fd);
// }
// }
// TODO get project and resource ID and itterate over the resource read(?) permissions and send a message to all listeners
/**
* Supported Resources:
* - Collection
* - Document
* - Bucket
* - File
* - User? / Account? (no permissions)
* - Session? (no permissions)
* - Team? (no permissions)
* - Membership? (no permissions)
* - Function
* - Execution
*/
foreach($connections as $fd => $connection) {
if ($server->exist($fd)
&& $server->isEstablished($fd)
) {
Console::info('Sending message: '.$message.' (user: '.$fd.', worker: '.$workerId.')');
$server->push($fd, $message, SWOOLE_WEBSOCKET_OPCODE_TEXT,
SWOOLE_WEBSOCKET_FLAG_FIN | SWOOLE_WEBSOCKET_FLAG_COMPRESS);
}
else {
$server->close($fd);
}
}
});
} catch (\Throwable $th) {
@ -119,19 +134,23 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
return null;
});
App::setResource('project', function () { // TODO get project from query string
return new Document();
});
App::setResource('user', function () { // TODO get user with JWT token
return new Document();
});
$channels = array_flip($request->getQuery('channels', []));
$jwt = $request->getQuery('jwt', '');
$user = $app->getResource('user');
$project = $app->getResource('project');
$roles = ['user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
$roles = ['*', 'user:'.$user->getId(), 'role:'.(($user->isEmpty()) ? Auth::USER_ROLE_GUEST : Auth::USER_ROLE_MEMBER)];
/** @var Appwrite\Database\Document $user */
/** @var Appwrite\Database\Document $project */
var_dump($project->getId());
var_dump($project->getAttribute('name'));
var_dump($user->getId());
var_dump($user->getAttribute('name'));
\array_map(function ($node) use (&$roles) {
if (isset($node['teamId']) && isset($node['roles'])) {
$roles[] = 'team:'.$node['teamId'];
@ -142,6 +161,20 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
}
}, $user->getAttribute('memberships', []));
/**
* Build Subscriptions Tree
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
*/
if(!isset($subscriptions[$project->getId()])) { // Init Project
$subscriptions[$project->getId()] = [];
}
@ -161,6 +194,11 @@ $server->on('open', function(Server $server, Request $request) use (&$connection
'roles' => $roles,
];
var_dump($project->getId());
var_dump($project->getAttribute('name'));
var_dump($user->getId());
var_dump($user->getAttribute('name'));
$server->push($connection, json_encode($subscriptions));
});
@ -175,8 +213,6 @@ $server->on('message', function(Server $server, Frame $frame) {
});
$server->on('close', function(Server $server, int $fd) use (&$connections, &$subscriptions) {
Console::error('Connection close: '.$fd);
$projectId = $connections[$fd]['projectId'] ?? '';
$roles = $connections[$fd]['roles'] ?? [];
@ -185,17 +221,23 @@ $server->on('close', function(Server $server, int $fd) use (&$connections, &$sub
unset($subscriptions[$projectId][$role][$channel][$fd]); // Remove connection
if(empty($subscriptions[$projectId][$role][$channel])) {
unset($subscriptions[$projectId][$role][$channel]); // Remove channel
unset($subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
}
}
if(empty($subscriptions[$projectId][$role])) {
unset($subscriptions[$projectId][$role]); // Remove role
unset($subscriptions[$projectId][$role]); // Remove role when no channels
}
}
if(empty($subscriptions[$projectId])) { // Remove project when no roles
unset($subscriptions[$projectId]);
}
unset($connections[$fd]);
Console::info('Connection close: '.$fd);
var_dump($subscriptions);
});

View file

@ -145,6 +145,9 @@ services:
- traefik.http.routers.appwrite_realtime_wss.tls.certresolver=dns
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
environment: