1
0
Fork 0
mirror of synced 2024-06-02 10:54:44 +12:00

Merge pull request #2465 from appwrite/feat-database-indexing-realtime-console

feat(realtime): add console channel
This commit is contained in:
Torsten Dittmann 2021-12-13 16:55:24 +01:00 committed by GitHub
commit a10eccbdc7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 1429 additions and 1084 deletions

View file

@ -167,7 +167,7 @@ App::init(function ($utopia, $request, $project) {
throw new Exception('JWT authentication is disabled for this project', 501);
}
break;
default:
throw new Exception('Unsupported authentication route');
break;
@ -188,7 +188,7 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
/** @var bool $mode */
if (!empty($events->getParam('event'))) {
if(empty($events->getParam('eventData'))) {
if (empty($events->getParam('eventData'))) {
$events->setParam('eventData', $response->getPayload());
}
@ -207,10 +207,10 @@ App::shutdown(function ($utopia, $request, $response, $project, $events, $audits
if ($project->getId() !== 'console') {
$payload = new Document($response->getPayload());
$target = Realtime::fromPayload($events->getParam('event'), $payload);
$target = Realtime::fromPayload($events->getParam('event'), $payload, $project);
Realtime::send(
$project->getId(),
$target['projectId'] ?? $project->getId(),
$response->getPayload(),
$events->getParam('event'),
$target['channels'],

View file

@ -21,9 +21,6 @@ use Appwrite\Extend\PDO;
use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Auth\Auth;
use Appwrite\Database\Database as DatabaseOld;
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Event\Event;
use Appwrite\Network\Validator\Email;
use Appwrite\Network\Validator\IP;
@ -156,43 +153,6 @@ if(!empty($user) || !empty($pass)) {
Resque::setBackend(App::getEnv('_APP_REDIS_HOST', '').':'.App::getEnv('_APP_REDIS_PORT', ''));
}
/**
* Old DB Filters
*/
DatabaseOld::addFilter('json',
function($value) {
if(!is_array($value)) {
return $value;
}
return json_encode($value);
},
function($value) {
return json_decode($value, true);
}
);
DatabaseOld::addFilter('encrypt',
function($value) {
$key = App::getEnv('_APP_OPENSSL_KEY_V1');
$iv = OpenSSL::randomPseudoBytes(OpenSSL::cipherIVLength(OpenSSL::CIPHER_AES_128_GCM));
$tag = null;
return json_encode([
'data' => OpenSSL::encrypt($value, OpenSSL::CIPHER_AES_128_GCM, $key, 0, $iv, $tag),
'method' => OpenSSL::CIPHER_AES_128_GCM,
'iv' => bin2hex($iv),
'tag' => bin2hex($tag),
'version' => '1',
]);
},
function($value) {
$value = json_decode($value, true);
$key = App::getEnv('_APP_OPENSSL_KEY_V'.$value['version']);
return OpenSSL::decrypt($value['data'], $value['method'], $key, 0, hex2bin($value['iv']), hex2bin($value['tag']));
}
);
/**
* New DB Filters
*/

View file

@ -1,5 +1,6 @@
<?php
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Utopia\CLI\Console;
use Utopia\Database\Document;
@ -33,7 +34,7 @@ class DatabaseV1 extends Worker
if($document->isEmpty()) {
throw new Exception('Missing document');
}
switch (strval($type)) {
case DATABASE_TYPE_CREATE_ATTRIBUTE:
$this->createAttribute($collection, $document, $projectId);
@ -67,9 +68,11 @@ class DatabaseV1 extends Worker
*/
protected function createAttribute(Document $collection, Document $attribute, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForInternal = $this->getInternalDB($projectId);
$dbForExternal = $this->getExternalDB($projectId);
$event = 'database.attributes.update';
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$type = $attribute->getAttribute('type', '');
@ -81,6 +84,7 @@ class DatabaseV1 extends Worker
$format = $attribute->getAttribute('format', '');
$formatOptions = $attribute->getAttribute('formatOptions', []);
$filters = $attribute->getAttribute('filters', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if(!$dbForExternal->createAttribute($collectionId, $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
@ -90,6 +94,20 @@ class DatabaseV1 extends Worker
} catch (\Throwable $th) {
Console::error($th->getMessage());
$dbForInternal->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'failed'));
} finally {
$target = Realtime::fromPayload($event, $attribute, $project);
Realtime::send(
projectId: 'console',
payload: $attribute->getArrayCopy(),
event: $event,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'collectionId' => $collection->getId()
]
);
}
$dbForInternal->deleteCachedDocument('collections', $collectionId);
@ -102,11 +120,15 @@ class DatabaseV1 extends Worker
*/
protected function deleteAttribute(Document $collection, Document $attribute, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForInternal = $this->getInternalDB($projectId);
$dbForExternal = $this->getExternalDB($projectId);
$event = 'database.attributes.delete';
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$status = $attribute->getAttribute('status', '');
$project = $dbForConsole->getDocument('projects', $projectId);
// possible states at this point:
// - available: should not land in queue; controller flips these to 'deleting'
@ -122,6 +144,20 @@ class DatabaseV1 extends Worker
} catch (\Throwable $th) {
Console::error($th->getMessage());
$dbForInternal->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'stuck'));
} finally {
$target = Realtime::fromPayload($event, $attribute, $project);
Realtime::send(
projectId: 'console',
payload: $attribute->getArrayCopy(),
event: $event,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'collectionId' => $collection->getId()
]
);
}
// The underlying database removes/rebuilds indexes when attribute is removed
@ -185,15 +221,18 @@ class DatabaseV1 extends Worker
*/
protected function createIndex(Document $collection, Document $index, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForInternal = $this->getInternalDB($projectId);
$dbForExternal = $this->getExternalDB($projectId);
$event = 'database.indexes.update';
$collectionId = $collection->getId();
$key = $index->getAttribute('key', '');
$type = $index->getAttribute('type', '');
$attributes = $index->getAttribute('attributes', []);
$lengths = $index->getAttribute('lengths', []);
$orders = $index->getAttribute('orders', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if(!$dbForExternal->createIndex($collectionId, $key, $type, $attributes, $lengths, $orders)) {
@ -203,6 +242,20 @@ class DatabaseV1 extends Worker
} catch (\Throwable $th) {
Console::error($th->getMessage());
$dbForInternal->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'failed'));
} finally {
$target = Realtime::fromPayload($event, $index, $project);
Realtime::send(
projectId: 'console',
payload: $index->getArrayCopy(),
event: $event,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'collectionId' => $collection->getId()
]
);
}
$dbForInternal->deleteCachedDocument('collections', $collectionId);
@ -215,12 +268,15 @@ class DatabaseV1 extends Worker
*/
protected function deleteIndex(Document $collection, Document $index, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForInternal = $this->getInternalDB($projectId);
$dbForExternal = $this->getExternalDB($projectId);
$collectionId = $collection->getId();
$key = $index->getAttribute('key');
$status = $index->getAttribute('status', '');
$event = 'database.indexes.delete';
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if($status !== 'failed' && !$dbForExternal->deleteIndex($collectionId, $key)) {
@ -230,6 +286,20 @@ class DatabaseV1 extends Worker
} catch (\Throwable $th) {
Console::error($th->getMessage());
$dbForInternal->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'stuck'));
} finally {
$target = Realtime::fromPayload($event, $index, $project);
Realtime::send(
projectId: 'console',
payload: $index->getArrayCopy(),
event: $event,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'collectionId' => $collection->getId()
]
);
}
$dbForInternal->deleteCachedDocument('collections', $collectionId);

View file

@ -57,10 +57,27 @@ window.addEventListener("load", async () => {
const realtime = window.ls.container.get('realtime');
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
let current = {};
window.ls.container.get('console').subscribe('project', event => {
for (let project in event.payload) {
current[project] = event.payload[project] ?? 0;
window.ls.container.get('console').subscribe(['project', 'console'], response => {
switch (response.event) {
case 'stats.connections':
for (let project in response.payload) {
current[project] = response.payload[project] ?? 0;
}
break;
case 'database.attributes.create':
case 'database.attributes.update':
case 'database.attributes.delete':
document.dispatchEvent(new CustomEvent('database.createAttribute'));
break;
case 'database.indexes.create':
case 'database.indexes.update':
case 'database.indexes.delete':
document.dispatchEvent(new CustomEvent('database.createIndex'));
break;
}
});
while (true) {

View file

@ -234,16 +234,18 @@ class Realtime extends Adapter
/**
* Create channels array based on the event name and payload.
*
*
* @param string $event
* @param Document $payload
* @param Document|null $project
* @return array
*/
public static function fromPayload(string $event, Document $payload): array
public static function fromPayload(string $event, Document $payload, Document $project = null): array
{
$channels = [];
$roles = [];
$permissionsChanged = false;
$projectId = null;
switch (true) {
case strpos($event, 'account.recovery.') === 0:
@ -279,6 +281,13 @@ class Realtime extends Adapter
$channels[] = 'collections.' . $payload->getId();
$roles = $payload->getRead();
break;
case strpos($event, 'database.attributes.') === 0:
case strpos($event, 'database.indexes.') === 0:
$channels[] = 'console';
$projectId = 'console';
$roles = ['team:' . $project->getAttribute('teamId')];
break;
case strpos($event, 'database.documents.') === 0:
$channels[] = 'documents';
@ -306,7 +315,8 @@ class Realtime extends Adapter
return [
'channels' => $channels,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged
'permissionsChanged' => $permissionsChanged,
'projectId' => $projectId
];
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,276 @@
<?php
namespace Tests\E2E\Services\Realtime;
use Exception;
use SebastianBergmann\RecursionContext\InvalidArgumentException;
use PHPUnit\Framework\ExpectationFailedException;
use PHPUnit\Framework\Exception as FrameworkException;
use Tests\E2E\Client;
use Tests\E2E\Scopes\Scope;
use Tests\E2E\Scopes\ProjectCustom;
use Tests\E2E\Scopes\SideConsole;
use WebSocket\BadOpcodeException;
use WebSocket\ConnectionException;
use WebSocket\TimeoutException;
class RealtimeConsoleClientTest extends Scope
{
use RealtimeBase;
use ProjectCustom;
use SideConsole;
public function testAttributes()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = 'console';
$client = $this->getWebsocket(['console'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_console='. $this->getRoot()['session'],
], $projectId);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('connected', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
/**
* Test Attributes
*/
$actors = $this->client->call(Client::METHOD_POST, '/database/collections', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'collectionId' => 'unique()',
'name' => 'Actors',
'read' => ['role:all'],
'write' => ['role:all'],
'permission' => 'collection'
]);
$data = ['actorsId' => $actors['body']['$id']];
$name = $this->client->call(Client::METHOD_POST, '/database/collections/' . $data['actorsId'] . '/attributes/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'attributeId' => 'name',
'size' => 256,
'required' => true,
]);
$this->assertEquals($name['headers']['status-code'], 201);
$this->assertEquals($name['body']['key'], 'name');
$this->assertEquals($name['body']['type'], 'string');
$this->assertEquals($name['body']['size'], 256);
$this->assertEquals($name['body']['required'], true);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.attributes.create', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals('processing', $response['data']['payload']['status']);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.attributes.update', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals('available', $response['data']['payload']['status']);
$client->close();
return $data;
}
/**
* @depends testAttributes
*/
public function testIndexes(array $data)
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = 'console';
$client = $this->getWebsocket(['console'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_console='. $this->getRoot()['session'],
], $projectId);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('connected', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
/**
* Test Indexes
*/
$index = $this->client->call(Client::METHOD_POST, '/database/collections/' . $data['actorsId'] . '/indexes', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'indexId' => 'key_name',
'type' => 'key',
'attributes' => [
'name',
],
]);
$this->assertEquals($index['headers']['status-code'], 201);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.indexes.create', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals('processing', $response['data']['payload']['status']);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.indexes.update', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals('available', $response['data']['payload']['status']);
$client->close();
return $data;
}
/**
* @depends testIndexes
*/
public function testDeleteIndex(array $data)
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = 'console';
$client = $this->getWebsocket(['console'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_console='. $this->getRoot()['session'],
], $projectId);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('connected', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
/**
* Test Delete Index
*/
$attribute = $this->client->call(Client::METHOD_DELETE, '/database/collections/' . $data['actorsId'] . '/indexes/key_name', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()));
$this->assertEquals($attribute['headers']['status-code'], 204);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.indexes.delete', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$client->close();
return $data;
}
/**
* @depends testDeleteIndex
*/
public function testDeleteAttribute(array $data)
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = 'console';
$client = $this->getWebsocket(['console'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_console='. $this->getRoot()['session'],
], $projectId);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('connected', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
/**
* Test Delete Attribute
*/
$attribute = $this->client->call(Client::METHOD_DELETE, '/database/collections/' . $data['actorsId'] . '/attributes/name', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()));
$this->assertEquals($attribute['headers']['status-code'], 204);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertEquals('database.attributes.delete', $response['data']['event']);
$this->assertNotEmpty($response['data']['payload']);
$client->close();
}
}

File diff suppressed because it is too large Load diff