Refactor Audit Worker to use Utopia Queue system
This commit is contained in:
parent
0c759e958d
commit
84f1a15a4c
5 changed files with 82 additions and 37 deletions
|
@ -4,6 +4,7 @@ require_once __DIR__ . '/init.php';
|
||||||
require_once __DIR__ . '/controllers/general.php';
|
require_once __DIR__ . '/controllers/general.php';
|
||||||
|
|
||||||
use Appwrite\Event\Func;
|
use Appwrite\Event\Func;
|
||||||
|
use Appwrite\Event\Audit;
|
||||||
use Appwrite\Platform\Appwrite;
|
use Appwrite\Platform\Appwrite;
|
||||||
use Utopia\CLI\CLI;
|
use Utopia\CLI\CLI;
|
||||||
use Utopia\Database\Validator\Authorization;
|
use Utopia\Database\Validator\Authorization;
|
||||||
|
@ -114,6 +115,10 @@ CLI::setResource('queueForFunctions', function (Group $pools) {
|
||||||
return new Func($pools->get('queue')->pop()->getResource());
|
return new Func($pools->get('queue')->pop()->getResource());
|
||||||
}, ['pools']);
|
}, ['pools']);
|
||||||
|
|
||||||
|
CLI::setResource('audits', function (Group $pools) {
|
||||||
|
return new Audit($pools->get('queue')->pop()->getResource());
|
||||||
|
}, ['pools']);
|
||||||
|
|
||||||
CLI::setResource('logError', function (Registry $register) {
|
CLI::setResource('logError', function (Registry $register) {
|
||||||
return function (Throwable $error, string $namespace, string $action) use ($register) {
|
return function (Throwable $error, string $namespace, string $action) use ($register) {
|
||||||
$logger = $register->get('logger');
|
$logger = $register->get('logger');
|
||||||
|
|
|
@ -847,7 +847,9 @@ App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en')))
|
||||||
|
|
||||||
// Queues
|
// Queues
|
||||||
App::setResource('events', fn() => new Event('', ''));
|
App::setResource('events', fn() => new Event('', ''));
|
||||||
App::setResource('audits', fn() => new Audit());
|
App::setResource('audits', function (Group $pools) {
|
||||||
|
return new Audit($pools->get('queue')->pop()->getResource());
|
||||||
|
}, ['pools']);
|
||||||
App::setResource('mails', fn() => new Mail());
|
App::setResource('mails', fn() => new Mail());
|
||||||
App::setResource('deletes', fn() => new Delete());
|
App::setResource('deletes', fn() => new Delete());
|
||||||
App::setResource('database', fn() => new EventDatabase());
|
App::setResource('database', fn() => new EventDatabase());
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
require_once __DIR__ . '/init.php';
|
require_once __DIR__ . '/init.php';
|
||||||
|
|
||||||
|
use Appwrite\Event\Audit;
|
||||||
use Appwrite\Event\Func;
|
use Appwrite\Event\Func;
|
||||||
use Swoole\Runtime;
|
use Swoole\Runtime;
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
|
@ -85,6 +86,16 @@ Server::setResource('queueForFunctions', function (Registry $register) {
|
||||||
);
|
);
|
||||||
}, ['register']);
|
}, ['register']);
|
||||||
|
|
||||||
|
Server::setResource('audits', function (Registry $register) {
|
||||||
|
$pools = $register->get('pools');
|
||||||
|
return new Audit(
|
||||||
|
$pools
|
||||||
|
->get('queue')
|
||||||
|
->pop()
|
||||||
|
->getResource()
|
||||||
|
);
|
||||||
|
}, ['register']);
|
||||||
|
|
||||||
Server::setResource('logger', function ($register) {
|
Server::setResource('logger', function ($register) {
|
||||||
return $register->get('logger');
|
return $register->get('logger');
|
||||||
}, ['register']);
|
}, ['register']);
|
||||||
|
|
|
@ -1,42 +1,33 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
use Appwrite\Resque\Worker;
|
require_once __DIR__ . '/../worker.php';
|
||||||
|
|
||||||
use Utopia\Audit\Audit;
|
use Utopia\Audit\Audit;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\Database\Database;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Validator\Authorization;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
use Utopia\Queue\Server;
|
||||||
|
|
||||||
require_once __DIR__ . '/../init.php';
|
Authorization::disable();
|
||||||
|
Authorization::setDefaultStatus(false);
|
||||||
|
|
||||||
Console::title('Audits V1 Worker');
|
Server::setResource('execute', function () {
|
||||||
Console::success(APP_NAME . ' audits worker v1 has started');
|
return function (
|
||||||
|
Database $dbForProject,
|
||||||
class AuditsV1 extends Worker
|
string $event,
|
||||||
{
|
array $payload,
|
||||||
public function getName(): string
|
string $mode,
|
||||||
|
string $resource,
|
||||||
|
string $userAgent,
|
||||||
|
string $ip,
|
||||||
|
Document $user,
|
||||||
|
Document $project
|
||||||
|
)
|
||||||
{
|
{
|
||||||
return "audits";
|
|
||||||
}
|
|
||||||
|
|
||||||
public function init(): void
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public function run(): void
|
|
||||||
{
|
|
||||||
$event = $this->args['event'];
|
|
||||||
$payload = $this->args['payload'];
|
|
||||||
$mode = $this->args['mode'];
|
|
||||||
$resource = $this->args['resource'];
|
|
||||||
$userAgent = $this->args['userAgent'];
|
|
||||||
$ip = $this->args['ip'];
|
|
||||||
|
|
||||||
$user = new Document($this->args['user']);
|
|
||||||
$project = new Document($this->args['project']);
|
|
||||||
|
|
||||||
$userName = $user->getAttribute('name', '');
|
$userName = $user->getAttribute('name', '');
|
||||||
$userEmail = $user->getAttribute('email', '');
|
$userEmail = $user->getAttribute('email', '');
|
||||||
|
|
||||||
$dbForProject = $this->getProjectDB($project);
|
|
||||||
$audit = new Audit($dbForProject);
|
$audit = new Audit($dbForProject);
|
||||||
$audit->log(
|
$audit->log(
|
||||||
userId: $user->getId(),
|
userId: $user->getId(),
|
||||||
|
@ -53,9 +44,41 @@ class AuditsV1 extends Worker
|
||||||
'data' => $payload,
|
'data' => $payload,
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
$server->job()
|
||||||
|
->inject('message')
|
||||||
|
->inject('dbForProject')
|
||||||
|
->inject('execute')
|
||||||
|
->action(function (Message $message, Database $dbForProject, callable $execute) {
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function shutdown(): void
|
$event = $payload['event'] ?? '';
|
||||||
{
|
$auditPayload = $payload['payload'] ?? '';
|
||||||
}
|
$mode = $payload['mode'] ?? '';
|
||||||
}
|
$resource = $payload['resource'] ?? '';
|
||||||
|
$userAgent = $payload['userAgent'] ?? '';
|
||||||
|
$ip = $payload['ip'] ?? '';
|
||||||
|
$project = new Document($payload['project'] ?? []);
|
||||||
|
$user = new Document($payload['user'] ?? []);
|
||||||
|
|
||||||
|
$execute(
|
||||||
|
$dbForProject,
|
||||||
|
$event,
|
||||||
|
$auditPayload,
|
||||||
|
$mode,
|
||||||
|
$resource,
|
||||||
|
$userAgent,
|
||||||
|
$ip,
|
||||||
|
$user,
|
||||||
|
$project
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
$server->workerStart();
|
||||||
|
$server->start();
|
|
@ -3,6 +3,8 @@
|
||||||
namespace Appwrite\Event;
|
namespace Appwrite\Event;
|
||||||
|
|
||||||
use Resque;
|
use Resque;
|
||||||
|
use Utopia\Queue\Client;
|
||||||
|
use Utopia\Queue\Connection;
|
||||||
|
|
||||||
class Audit extends Event
|
class Audit extends Event
|
||||||
{
|
{
|
||||||
|
@ -11,7 +13,7 @@ class Audit extends Event
|
||||||
protected string $userAgent = '';
|
protected string $userAgent = '';
|
||||||
protected string $ip = '';
|
protected string $ip = '';
|
||||||
|
|
||||||
public function __construct()
|
public function __construct(protected Connection $connection)
|
||||||
{
|
{
|
||||||
parent::__construct(Event::AUDITS_QUEUE_NAME, Event::AUDITS_CLASS_NAME);
|
parent::__construct(Event::AUDITS_QUEUE_NAME, Event::AUDITS_CLASS_NAME);
|
||||||
}
|
}
|
||||||
|
@ -116,7 +118,9 @@ class Audit extends Event
|
||||||
*/
|
*/
|
||||||
public function trigger(): string|bool
|
public function trigger(): string|bool
|
||||||
{
|
{
|
||||||
return Resque::enqueue($this->queue, $this->class, [
|
$client = new Client($this->queue, $this->connection);
|
||||||
|
|
||||||
|
return $client->enqueue([
|
||||||
'project' => $this->project,
|
'project' => $this->project,
|
||||||
'user' => $this->user,
|
'user' => $this->user,
|
||||||
'payload' => $this->payload,
|
'payload' => $this->payload,
|
||||||
|
|
Loading…
Reference in a new issue