1
0
Fork 0
mirror of synced 2024-06-15 01:04:51 +12:00

fix: workers

This commit is contained in:
Torsten Dittmann 2022-06-19 12:54:47 +02:00
parent ddbe1edf6d
commit c89b62a7b4
14 changed files with 79 additions and 67 deletions

View file

@ -208,6 +208,7 @@ App::shutdown(function (App $utopia, Request $request, Response $response, Docum
Realtime::send(
projectId: $target['projectId'] ?? $project->getId(),
projectInternalId: $project->getInternalId(),
payload: $events->getPayload(),
events: $allEvents,
channels: $target['channels'],

View file

@ -839,6 +839,7 @@ App::setResource('project', function ($dbForConsole, $request, $console) {
App::setResource('console', function () {
return new Document([
'$id' => 'console',
'$internalId' => 'console',
'name' => 'Appwrite',
'$collection' => 'projects',
'description' => 'Appwrite core engine',

View file

@ -224,7 +224,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
$event = [
'project' => 'console',
'projectId' => 'console',
'projectInternalId' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
@ -251,7 +252,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$payload = ['response' => 'WS:/v1/realtime:passed'];
$event = [
'project' => 'console',
'projectId' => 'console',
'projectInternalId' => 'console',
'roles' => ['role:guest'],
'data' => [
'events' => ['test.event'],
@ -292,7 +294,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
$projectId = $event['project'];
$projectId = $event['projectId'];
$projectInternalId = $event['projectInternalId'];
$userId = $event['userId'];
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
@ -303,7 +306,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$roles = Auth::getRoles($user);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
$realtime->subscribe($projectId, $projectInternalId, $connection, $roles, $realtime->connections[$connection]['channels']);
call_user_func($returnDatabase);
}
@ -326,7 +329,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
);
if (($num = count($receivers)) > 0) {
$stats->incr($event['project'], 'messages', $num);
$stats->incr($event['projectId'], 'messages', $num);
}
});
} catch (\Throwable $th) {
@ -421,7 +424,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception('Missing channels', 1008);
}
$realtime->subscribe($project->getId(), $connection, $roles, $channels);
$realtime->subscribe($project->getId(), $project->getInternalId(), $connection, $roles, $channels);
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_USER);
@ -480,8 +483,8 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$cache = new Cache(new RedisCache($redis));
$database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace("_{$realtime->connections[$connection]['projectId']}");
$database->setNamespace("_{$realtime->connections[$connection]['projectInternalId']}");
var_dump($realtime->connections[$connection]['projectInternalId']);
/*
* Abuse Check
*
@ -530,7 +533,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
$roles = Auth::getRoles($user);
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels);
$realtime->subscribe($realtime->connections[$connection]['projectId'], $realtime->connections[$connection]['projectInternalId'], $connection, $roles, $channels);
$user = $response->output($user, Response::MODEL_USER);
$server->send([$connection], json_encode([

View file

@ -37,7 +37,7 @@ class AuditsV1 extends Worker
$userName = $user->getAttribute('name', '');
$userEmail = $user->getAttribute('email', '');
$dbForProject = $this->getProjectDB($project->getId());
$dbForProject = $this->getProjectDB($project->getInternalId());
$audit = new Audit($dbForProject);
$audit->log(
userId: $user->getId(),

View file

@ -58,7 +58,7 @@ class BuildsV1 extends Worker
protected function buildDeployment(Document $project, Document $function, Document $deployment)
{
$dbForProject = $this->getProjectDB($project->getId());
$dbForProject = $this->getProjectDB($project->getInternalId());
$function = $dbForProject->getDocument('functions', $function->getId());
if ($function->isEmpty()) {
@ -139,6 +139,7 @@ class BuildsV1 extends Worker
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
@ -211,6 +212,7 @@ class BuildsV1 extends Worker
);
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],

View file

@ -69,7 +69,8 @@ class DatabaseV1 extends Worker
protected function createAttribute(Document $collection, Document $attribute, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($projectId);
$project = $dbForConsole->getDocument('projects', $projectId);
$dbForProject = $this->getProjectDB($project->getInternalId());
$events = Event::generateEvents('collections.[collectionId].attributes.[attributeId].update', [
'collectionId' => $collection->getId(),
@ -91,7 +92,6 @@ class DatabaseV1 extends Worker
$format = $attribute->getAttribute('format', '');
$formatOptions = $attribute->getAttribute('formatOptions', []);
$filters = $attribute->getAttribute('filters', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if (!$dbForProject->createAttribute('collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
@ -111,6 +111,7 @@ class DatabaseV1 extends Worker
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $attribute->getArrayCopy(),
events: $events,
channels: $target['channels'],
@ -133,7 +134,8 @@ class DatabaseV1 extends Worker
protected function deleteAttribute(Document $collection, Document $attribute, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($projectId);
$project = $dbForConsole->getDocument('projects', $projectId);
$dbForProject = $this->getProjectDB($project->getInternalId());
$events = Event::generateEvents('collections.[collectionId].attributes.[attributeId].delete', [
'collectionId' => $collection->getId(),
@ -142,7 +144,6 @@ class DatabaseV1 extends Worker
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$status = $attribute->getAttribute('status', '');
$project = $dbForConsole->getDocument('projects', $projectId);
// possible states at this point:
// - available: should not land in queue; controller flips these to 'deleting'
@ -168,6 +169,7 @@ class DatabaseV1 extends Worker
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $attribute->getArrayCopy(),
events: $events,
channels: $target['channels'],
@ -243,7 +245,8 @@ class DatabaseV1 extends Worker
protected function createIndex(Document $collection, Document $index, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($projectId);
$project = $dbForConsole->getDocument('projects', $projectId);
$dbForProject = $this->getProjectDB($project->getInternalId());
$events = Event::generateEvents('collections.[collectionId].indexes.[indexId].update', [
'collectionId' => $collection->getId(),
@ -255,7 +258,6 @@ class DatabaseV1 extends Worker
$attributes = $index->getAttribute('attributes', []);
$lengths = $index->getAttribute('lengths', []);
$orders = $index->getAttribute('orders', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if (!$dbForProject->createIndex('collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) {
@ -275,6 +277,7 @@ class DatabaseV1 extends Worker
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $index->getArrayCopy(),
events: $events,
channels: $target['channels'],
@ -297,7 +300,8 @@ class DatabaseV1 extends Worker
protected function deleteIndex(Document $collection, Document $index, string $projectId): void
{
$dbForConsole = $this->getConsoleDB();
$dbForProject = $this->getProjectDB($projectId);
$project = $dbForConsole->getDocument('projects', $projectId);
$dbForProject = $this->getProjectDB($project->getInternalId());
$events = Event::generateEvents('collections.[collectionId].indexes.[indexId].delete', [
'collectionId' => $collection->getId(),
@ -305,7 +309,6 @@ class DatabaseV1 extends Worker
]);
$key = $index->getAttribute('key');
$status = $index->getAttribute('status', '');
$project = $dbForConsole->getDocument('projects', $projectId);
try {
if ($status !== 'failed' && !$dbForProject->deleteIndex('collection_' . $collection->getInternalId(), $key)) {
@ -325,6 +328,7 @@ class DatabaseV1 extends Worker
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $index->getArrayCopy(),
events: $events,
channels: $target['channels'],

View file

@ -48,25 +48,25 @@ class DeletesV1 extends Worker
switch ($document->getCollection()) {
case DELETE_TYPE_COLLECTIONS:
$this->deleteCollection($document, $project->getId());
$this->deleteCollection($document, $project->getInternalId());
break;
case DELETE_TYPE_PROJECTS:
$this->deleteProject($document);
break;
case DELETE_TYPE_FUNCTIONS:
$this->deleteFunction($document, $project->getId());
$this->deleteFunction($document, $project->getInternalId());
break;
case DELETE_TYPE_DEPLOYMENTS:
$this->deleteDeployment($document, $project->getId());
$this->deleteDeployment($document, $project->getInternalId());
break;
case DELETE_TYPE_USERS:
$this->deleteUser($document, $project->getId());
$this->deleteUser($document, $project->getInternalId());
break;
case DELETE_TYPE_TEAMS:
$this->deleteMemberships($document, $project->getId());
$this->deleteMemberships($document, $project->getInternalId());
break;
case DELETE_TYPE_BUCKETS:
$this->deleteBucket($document, $project->getId());
$this->deleteBucket($document, $project->getInternalId());
break;
default:
Console::error('No lazy delete operation available for document of type: ' . $document->getCollection());
@ -471,7 +471,7 @@ class DeletesV1 extends Worker
$chunk++;
/** @var string[] $projectIds */
$projectIds = array_map(fn (Document $project) => $project->getId(), $projects);
$projectIds = array_map(fn (Document $project) => $project->getInternalId(), $projects);
$sum = count($projects);

View file

@ -44,7 +44,7 @@ class FunctionsV1 extends Worker
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
$database = $this->getProjectDB($project->getId());
$database = $this->getProjectDB($project->getInternalId());
/**
* Handle Event execution.
@ -340,6 +340,7 @@ class FunctionsV1 extends Worker
);
Realtime::send(
projectId: 'console',
projectInternalId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
@ -347,6 +348,7 @@ class FunctionsV1 extends Worker
);
Realtime::send(
projectId: $project->getId(),
projectInternalId: $project->getInternalId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],

View file

@ -45,7 +45,7 @@
"utopia-php/cache": "0.6.*",
"utopia-php/cli": "0.12.*",
"utopia-php/config": "0.2.*",
"utopia-php/database": "dev-bug-last-internal-id as 0.18.1",
"utopia-php/database": "0.18.*",
"utopia-php/locale": "0.4.*",
"utopia-php/registry": "0.5.*",
"utopia-php/preloader": "0.2.*",

39
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "2fb5e41dc214b996d95f49a49d64d00e",
"content-hash": "1593c7304ba026d4073de336227858f3",
"packages": [
{
"name": "adhocore/jwt",
@ -1583,16 +1583,16 @@
},
{
"name": "squizlabs/php_codesniffer",
"version": "3.7.0",
"version": "3.7.1",
"source": {
"type": "git",
"url": "https://github.com/squizlabs/PHP_CodeSniffer.git",
"reference": "a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563"
"reference": "1359e176e9307e906dc3d890bcc9603ff6d90619"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/squizlabs/PHP_CodeSniffer/zipball/a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563",
"reference": "a2cd51b45bcaef9c1f2a4bda48f2dd2fa2b95563",
"url": "https://api.github.com/repos/squizlabs/PHP_CodeSniffer/zipball/1359e176e9307e906dc3d890bcc9603ff6d90619",
"reference": "1359e176e9307e906dc3d890bcc9603ff6d90619",
"shasum": ""
},
"require": {
@ -1635,7 +1635,7 @@
"source": "https://github.com/squizlabs/PHP_CodeSniffer",
"wiki": "https://github.com/squizlabs/PHP_CodeSniffer/wiki"
},
"time": "2022-06-13T06:31:38+00:00"
"time": "2022-06-18T07:21:10+00:00"
},
{
"name": "symfony/deprecation-contracts",
@ -2107,16 +2107,16 @@
},
{
"name": "utopia-php/database",
"version": "dev-bug-last-internal-id",
"version": "0.18.2",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc"
"reference": "781c31238b03ebc530a225973c4d1a921e00c2b9"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc",
"reference": "ef5bd5a1e761e04352a64e39b5b1b37e7e7a36cc",
"url": "https://api.github.com/repos/utopia-php/database/zipball/781c31238b03ebc530a225973c4d1a921e00c2b9",
"reference": "781c31238b03ebc530a225973c4d1a921e00c2b9",
"shasum": ""
},
"require": {
@ -2165,9 +2165,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/bug-last-internal-id"
"source": "https://github.com/utopia-php/database/tree/0.18.2"
},
"time": "2022-06-14T12:42:10+00:00"
"time": "2022-06-19T09:32:07+00:00"
},
{
"name": "utopia-php/domains",
@ -5346,18 +5346,9 @@
"time": "2022-05-17T05:48:52+00:00"
}
],
"aliases": [
{
"package": "utopia-php/database",
"version": "dev-bug-last-internal-id",
"alias": "0.18.1",
"alias_normalized": "0.18.1.0"
}
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {
"utopia-php/database": 20
},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
@ -5379,5 +5370,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.2.0"
"plugin-api-version": "2.3.0"
}

View file

@ -4,7 +4,7 @@ namespace Appwrite\Messaging;
abstract class Adapter
{
abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void;
abstract public function subscribe(string $projectId, string $projectInternalId, mixed $identifier, array $roles, array $channels): void;
abstract public function unsubscribe(mixed $identifier): void;
abstract public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
abstract public static function send(string $projectId, string $projectInternalId, array $payload, array $events, array $channels, array $roles, array $options): void;
}

View file

@ -42,7 +42,7 @@ class Realtime extends Adapter
* @param array $channels
* @return void
*/
public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void
public function subscribe(string $projectId, string $projectInternalId, mixed $identifier, array $roles, array $channels): void
{
if (!isset($this->subscriptions[$projectId])) { // Init Project
$this->subscriptions[$projectId] = [];
@ -60,6 +60,7 @@ class Realtime extends Adapter
$this->connections[$identifier] = [
'projectId' => $projectId,
'projectInternalId' => $projectInternalId,
'roles' => $roles,
'channels' => $channels
];
@ -120,6 +121,7 @@ class Realtime extends Adapter
/**
* Sends an event to the Realtime Server
* @param string $projectId
* @param string $projectInternalId
* @param array $payload
* @param string $event
* @param array $channels
@ -127,7 +129,7 @@ class Realtime extends Adapter
* @param array $options
* @return void
*/
public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
public static function send(string $projectId, string $projectInternalId, array $payload, array $events, array $channels, array $roles, array $options = []): void
{
if (empty($channels) || empty($roles) || empty($projectId)) {
return;
@ -139,10 +141,11 @@ class Realtime extends Adapter
$redis = new \Redis(); //TODO: make this part of the constructor
$redis->connect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([
'project' => $projectId,
'projectId' => $projectId,
'projectInternalId' => $projectInternalId,
'userId' => $userId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'events' => $events,
'channels' => $channels,
@ -172,11 +175,11 @@ class Realtime extends Adapter
/**
* Check if project has subscriber.
*/
if (isset($this->subscriptions[$event['project']])) {
if (isset($this->subscriptions[$event['projectId']])) {
/**
* Iterate through each role.
*/
foreach ($this->subscriptions[$event['project']] as $role => $subscription) {
foreach ($this->subscriptions[$event['projectId']] as $role => $subscription) {
/**
* Iterate through each channel.
*/
@ -185,13 +188,13 @@ class Realtime extends Adapter
* Check if channel has subscriber. Also taking care of the role in the event and the wildcard role.
*/
if (
\array_key_exists($channel, $this->subscriptions[$event['project']][$role])
\array_key_exists($channel, $this->subscriptions[$event['projectId']][$role])
&& (\in_array($role, $event['roles']) || \in_array('role:all', $event['roles']))
) {
/**
* Saving all connections that are allowed to receive this event.
*/
foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) {
foreach (array_keys($this->subscriptions[$event['projectId']][$role][$channel]) as $id) {
/**
* To prevent duplicates, we save the connections as array keys.
*/
@ -248,6 +251,7 @@ class Realtime extends Adapter
$roles = [];
$permissionsChanged = false;
$projectId = null;
$projectInternalId = null;
// TODO: add method here to remove all the magic index accesses
$parts = explode('.', $event);
@ -275,6 +279,7 @@ class Realtime extends Adapter
if (in_array($parts[2], ['attributes', 'indexes'])) {
$channels[] = 'console';
$projectId = 'console';
$projectInternalId = 'console';
$roles = ['team:' . $project->getAttribute('teamId')];
} elseif ($parts[2] === 'documents') {
if ($collection->isEmpty()) {
@ -322,7 +327,8 @@ class Realtime extends Adapter
'channels' => $channels,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'projectId' => $projectId
'projectId' => $projectId,
'projectInternalId' => $projectInternalId
];
}
}

View file

@ -12,6 +12,7 @@ class MessagingGuestTest extends TestCase
$realtime = new Realtime();
$realtime->subscribe(
'1',
'1',
1,
['role:guest'],
@ -19,7 +20,7 @@ class MessagingGuestTest extends TestCase
);
$event = [
'project' => '1',
'projectId' => '1',
'roles' => ['role:all'],
'data' => [
'channels' => [
@ -109,7 +110,7 @@ class MessagingGuestTest extends TestCase
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['project'] = '2';
$event['projectId'] = '2';
$receivers = $realtime->getSubscribers($event);

View file

@ -21,6 +21,7 @@ class MessagingTest extends TestCase
$realtime = new Realtime();
$realtime->subscribe(
'1',
'1',
1,
['user:123', 'role:member', 'team:abc', 'team:abc/administrator', 'team:abc/moderator', 'team:def', 'team:def/guest'],
@ -28,7 +29,7 @@ class MessagingTest extends TestCase
);
$event = [
'project' => '1',
'projectId' => '1',
'roles' => ['role:all'],
'data' => [
'channels' => [
@ -117,7 +118,7 @@ class MessagingTest extends TestCase
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['project'] = '2';
$event['projectId'] = '2';
$receivers = $realtime->getSubscribers($event);