1
0
Fork 0
mirror of synced 2024-09-29 08:51:28 +13:00

Merge branch 'feat-265-realtime' of https://github.com/appwrite/appwrite into feat-265-realtime-console

This commit is contained in:
Torsten Dittmann 2021-06-18 12:00:55 +02:00
commit 7e53800a3e
9 changed files with 233 additions and 36 deletions

View file

@ -37,10 +37,12 @@ App::post('/v1/teams')
->inject('response')
->inject('user')
->inject('projectDB')
->action(function ($name, $roles, $response, $user, $projectDB) {
->inject('events')
->action(function ($name, $roles, $response, $user, $projectDB, $events) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Document $user */
/** @var Appwrite\Database\Database $projectDB */
/** @var Appwrite\Event\Event $events */
Authorization::disable();
@ -90,6 +92,10 @@ App::post('/v1/teams')
}
}
if (!empty($user->getId())) {
$events->setParam('userId', $user->getId());
}
$response
->setStatusCode(Response::STATUS_CODE_CREATED)
->dynamic($team, Response::MODEL_TEAM)

View file

@ -201,6 +201,7 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
if ($project->getId() !== 'console') {
$realtime
->setEvent($events->getParam('event'))
->setUserId($events->getParam('userId'))
->setProject($project->getId())
->setPayload($response->getPayload())
->trigger();

View file

@ -1,6 +1,7 @@
<?php
use Appwrite\Realtime\Server;
use Utopia\App;
require_once __DIR__ . '/init.php';
@ -10,4 +11,4 @@ $config = [
'package_max_length' => 64000 // Default maximum Package Size (64kb)
];
$realtimeServer = new Server($register, config: $config);
$realtimeServer = new Server($register, port: App::getEnv('PORT', 80), config: $config);

View file

@ -8,7 +8,6 @@ require_once __DIR__.'/../workers.php';
Console::title('Webhooks V1 Worker');
Console::success(APP_NAME.' webhooks worker v1 has started');
use Appwrite\Resque\Worker;
class WebhooksV1 extends Worker
{
@ -92,4 +91,4 @@ class WebhooksV1 extends Worker
public function shutdown(): void
{
}
}
}

36
composer.lock generated
View file

@ -4823,16 +4823,16 @@
},
{
"name": "sebastian/type",
"version": "2.3.2",
"version": "2.3.4",
"source": {
"type": "git",
"url": "https://github.com/sebastianbergmann/type.git",
"reference": "0d1c587401514d17e8f9258a27e23527cb1b06c1"
"reference": "b8cd8a1c753c90bc1a0f5372170e3e489136f914"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/sebastianbergmann/type/zipball/0d1c587401514d17e8f9258a27e23527cb1b06c1",
"reference": "0d1c587401514d17e8f9258a27e23527cb1b06c1",
"url": "https://api.github.com/repos/sebastianbergmann/type/zipball/b8cd8a1c753c90bc1a0f5372170e3e489136f914",
"reference": "b8cd8a1c753c90bc1a0f5372170e3e489136f914",
"shasum": ""
},
"require": {
@ -4867,7 +4867,7 @@
"homepage": "https://github.com/sebastianbergmann/type",
"support": {
"issues": "https://github.com/sebastianbergmann/type/issues",
"source": "https://github.com/sebastianbergmann/type/tree/2.3.2"
"source": "https://github.com/sebastianbergmann/type/tree/2.3.4"
},
"funding": [
{
@ -4875,7 +4875,7 @@
"type": "github"
}
],
"time": "2021-06-04T13:02:07+00:00"
"time": "2021-06-15T12:49:02+00:00"
},
{
"name": "sebastian/version",
@ -4984,16 +4984,16 @@
},
{
"name": "symfony/console",
"version": "v5.3.0",
"version": "v5.3.2",
"source": {
"type": "git",
"url": "https://github.com/symfony/console.git",
"reference": "058553870f7809087fa80fa734704a21b9bcaeb2"
"reference": "649730483885ff2ca99ca0560ef0e5f6b03f2ac1"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/console/zipball/058553870f7809087fa80fa734704a21b9bcaeb2",
"reference": "058553870f7809087fa80fa734704a21b9bcaeb2",
"url": "https://api.github.com/repos/symfony/console/zipball/649730483885ff2ca99ca0560ef0e5f6b03f2ac1",
"reference": "649730483885ff2ca99ca0560ef0e5f6b03f2ac1",
"shasum": ""
},
"require": {
@ -5062,7 +5062,7 @@
"terminal"
],
"support": {
"source": "https://github.com/symfony/console/tree/v5.3.0"
"source": "https://github.com/symfony/console/tree/v5.3.2"
},
"funding": [
{
@ -5078,7 +5078,7 @@
"type": "tidelift"
}
],
"time": "2021-05-26T17:43:10+00:00"
"time": "2021-06-12T09:42:48+00:00"
},
{
"name": "symfony/deprecation-contracts",
@ -5635,16 +5635,16 @@
},
{
"name": "symfony/string",
"version": "v5.3.0",
"version": "v5.3.2",
"source": {
"type": "git",
"url": "https://github.com/symfony/string.git",
"reference": "a9a0f8b6aafc5d2d1c116dcccd1573a95153515b"
"reference": "0732e97e41c0a590f77e231afc16a327375d50b0"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/string/zipball/a9a0f8b6aafc5d2d1c116dcccd1573a95153515b",
"reference": "a9a0f8b6aafc5d2d1c116dcccd1573a95153515b",
"url": "https://api.github.com/repos/symfony/string/zipball/0732e97e41c0a590f77e231afc16a327375d50b0",
"reference": "0732e97e41c0a590f77e231afc16a327375d50b0",
"shasum": ""
},
"require": {
@ -5698,7 +5698,7 @@
"utf8"
],
"support": {
"source": "https://github.com/symfony/string/tree/v5.3.0"
"source": "https://github.com/symfony/string/tree/v5.3.2"
},
"funding": [
{
@ -5714,7 +5714,7 @@
"type": "tidelift"
}
],
"time": "2021-05-26T17:43:10+00:00"
"time": "2021-06-06T09:51:56+00:00"
},
{
"name": "textalk/websocket",

View file

@ -17,6 +17,11 @@ class Realtime
*/
protected $event = '';
/**
* @var string
*/
protected $userId = '';
/**
* @var array
*/
@ -27,6 +32,11 @@ class Realtime
*/
protected $permissions = [];
/**
* @var false
*/
protected $permissionsChanged = false;
/**
* @var Document
*/
@ -57,6 +67,16 @@ class Realtime
return $this;
}
/**
* @param string $userId
* return $this
*/
public function setUserId(string $userId): self
{
$this->userId = $userId;
return $this;
}
/**
* @return string
*/
@ -120,6 +140,20 @@ class Realtime
$this->channels[] = 'account.' . $this->payload->getId();
$this->permissions = ['user:' . $this->payload->getId()];
break;
case strpos($this->event, 'teams.memberships') === 0:
$this->permissionsChanged = in_array($this->event, ['teams.memberships.update', 'teams.memberships.delete', 'teams.memberships.update.status']);
$this->channels[] = 'memberships';
$this->channels[] = 'memberships.' . $this->payload->getId();
$this->permissions = ['team:' . $this->payload->getAttribute('teamId')];
break;
case strpos($this->event, 'teams.') === 0:
$this->permissionsChanged = $this->event === 'teams.create';
$this->channels[] = 'teams';
$this->channels[] = 'teams.' . $this->payload->getId();
$this->permissions = ['team:' . $this->payload->getId()];
break;
case strpos($this->event, 'database.collections.') === 0:
$this->channels[] = 'collections';
@ -148,7 +182,7 @@ class Realtime
$this->permissions = $this->payload->getAttribute('$permissions.read');
}
break;
}
}
}
/**
@ -166,6 +200,8 @@ class Realtime
$redis->publish('realtime', json_encode([
'project' => $this->project,
'permissions' => $this->permissions,
'permissionsChanged' => $this->permissionsChanged,
'userId' => $this->userId,
'data' => [
'event' => $this->event,
'channels' => $this->channels,

View file

@ -163,6 +163,7 @@ class Parser
$connections[$connection] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels
];
}

View file

@ -2,6 +2,9 @@
namespace Appwrite\Realtime;
use Appwrite\Database\Database;
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Event\Event;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Response;
@ -17,6 +20,7 @@ use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Exception as UtopiaException;
use Utopia\Registry\Registry;
use Utopia\Swoole\Request as SwooleRequest;
@ -176,7 +180,7 @@ class Server
return $db;
});
$this->register->set('cache', function () use (&$redis) { // Register cache connection
$this->register->set('cache', function () use (&$redis) {
return $redis;
});
@ -318,20 +322,12 @@ class Server
*/
public function onRedisPublish(string $payload, SwooleServer &$server, int $workerId)
{
/**
* Supported Resources:
* - Collection
* - Document
* - File
* - Account
* - Session
* - Team? (not implemented yet)
* - Membership? (not implemented yet)
* - Function
* - Execution
*/
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$this->addPermission($event);
}
$receivers = Parser::identifyReceivers($event, $this->subscriptions);
// Temporarily print debug logs by default for Alpha testing.
@ -390,4 +386,43 @@ class Server
}
}
}
private function addPermission(array $event)
{
$project = $event['project'];
$userId = $event['userId'];
if (array_key_exists($project, $this->subscriptions) && array_key_exists('user:'.$userId, $this->subscriptions[$project])) {
$connection = array_key_first(reset($this->subscriptions[$project]['user:'.$userId]));
} else {
return;
}
/**
* This is redundant soon and will be gone with merging the usage branch.
*/
$db = $this->register->get('dbPool')->get();
$redis = $this->register->get('redisPool')->get();
$this->register->set('db', function () use (&$db) {
return $db;
});
$this->register->set('cache', function () use (&$redis) {
return $redis;
});
$projectDB = new Database();
$projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register));
$projectDB->setNamespace('app_'.$project);
$projectDB->setMocks(Config::getParam('collections', []));
$user = $projectDB->getDocument($userId);
Parser::setUser($user);
$roles = Parser::getRoles();
Parser::subscribe($project, $connection, $roles, $this->subscriptions, $this->connections, $this->connections[$connection]['channels']);
}
}

View file

@ -686,4 +686,122 @@ trait RealtimeBase
$client->close();
}
public function testChannelTeams(): array
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
$client = $this->getWebsocket(['teams'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_'.$projectId.'=' . $session
]);
$response = json_decode($client->receive(), true);
$this->assertCount(1, $response);
$this->assertArrayHasKey('teams', $response);
/**
* Test Team Create
*/
$team = $this->client->call(Client::METHOD_POST, '/teams', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'name' => 'Arsenal'
]);
$teamId = $team['body']['$id'] ?? '';
$this->assertEquals(201, $team['headers']['status-code']);
$this->assertNotEmpty($team['body']['$id']);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('timestamp', $response);
$this->assertCount(2, $response['channels']);
$this->assertContains('teams', $response['channels']);
$this->assertContains('teams.' . $teamId, $response['channels']);
$this->assertEquals('teams.create', $response['event']);
$this->assertNotEmpty($response['payload']);
/**
* Test Team Update
*/
$team = $this->client->call(Client::METHOD_PUT, '/teams/'.$teamId, array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'name' => 'Manchester'
]);
$this->assertEquals($team['headers']['status-code'], 200);
$this->assertNotEmpty($team['body']['$id']);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('timestamp', $response);
$this->assertCount(2, $response['channels']);
$this->assertContains('teams', $response['channels']);
$this->assertContains('teams.' . $teamId, $response['channels']);
$this->assertEquals('teams.update', $response['event']);
$this->assertNotEmpty($response['payload']);
$client->close();
return ['teamId' => $teamId];
}
/**
* @depends testChannelTeams
*/
public function testChannelMemberships(array $data)
{
$teamId = $data['teamId'] ?? '';
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
$client = $this->getWebsocket(['memberships'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_'.$projectId.'='.$session
]);
$response = json_decode($client->receive(), true);
$this->assertCount(1, $response);
$this->assertArrayHasKey('memberships', $response);
$response = $this->client->call(Client::METHOD_GET, '/teams/'.$teamId.'/memberships', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()));
$membershipId = $response['body']['memberships'][0]['$id'];
/**
* Test Update Membership
*/
$roles = ['admin', 'editor', 'uncle'];
$this->client->call(Client::METHOD_PATCH, '/teams/'.$teamId.'/memberships/'.$membershipId, array_merge([
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'roles' => $roles
]);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('timestamp', $response);
$this->assertCount(2, $response['channels']);
$this->assertContains('memberships', $response['channels']);
$this->assertContains('memberships.' . $membershipId, $response['channels']);
$this->assertEquals('teams.memberships.update', $response['event']);
$this->assertNotEmpty($response['payload']);
$client->close();
}
}