1
0
Fork 0
mirror of synced 2024-05-17 11:12:41 +12:00

changing event signature

This commit is contained in:
shimon 2023-06-02 06:54:34 +03:00
parent 73f3f0aee6
commit 6e7c160249
24 changed files with 109 additions and 47 deletions

View file

@ -3410,6 +3410,17 @@ $collections = [
'default' => null,
'filters' => [],
],
[
'array' => false,
'$id' => ID::custom('bucketInternalId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => true,
'default' => null,
'filters' => [],
],
[
'$id' => ID::custom('name'),
'type' => Database::VAR_STRING,

@ -1 +1 @@
Subproject commit 2fac7c1f390637f7dfc11882dcad6fb6508c83f1
Subproject commit 9174d8f8cb584744dd7a53f69d324f490ee82ee3

View file

@ -428,9 +428,11 @@ App::shutdown()
$responsePayload = $response->getPayload();
if (!empty($queueForEvents->getEvent())) {
if (empty($queueForEvents->getPayload())) {
$queueForEvents->setPayload($responsePayload);
}
/**
* Trigger functions.
*/

View file

@ -882,7 +882,7 @@ App::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
App::setResource('queueForEvents', function (Connection $queue) {
return new Event('', '', $queue);
return new Event($queue);
}, ['queue']);
App::setResource('queueForAudits', function (Connection $queue) {
return new Audit($queue);

View file

@ -12,6 +12,7 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Phone;
use Appwrite\Event\Usage;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Appwrite;
use Swoole\Runtime;
use Utopia\App;
@ -226,14 +227,18 @@ $args = $_SERVER['argv'];
if (isset($args[0])) {
$workerName = end($args);
} else {
throw new Exception('Missing worker name');
Console::error('Missing worker name');
}
$platform->init(Service::TYPE_WORKER, [
'workersNumber' => swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)),
'connection' => $pools->get('queue')->pop()->getResource(),
'workerName' => $workerName,
]);
try {
$platform->init(Service::TYPE_WORKER, [
'workersNum' => swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)),
'connection' => $pools->get('queue')->pop()->getResource(),
'workerName' => $workerName ?? null,
]);
} catch (\Exception $e) {
Console::error($e->getMessage() . ', File: '.$e->getFile(). ', Line: '.$e->getLine());
}
$worker = $platform->getWorker();

View file

@ -1,3 +1,3 @@
#!/bin/sh
QUEUE=v1-audits php /usr/src/code/app/worker.php audits $@
php /usr/src/code/app/worker.php audits $@

View file

@ -1,3 +1,3 @@
#!/bin/sh
QUEUE=v1-mails php /usr/src/code/app/worker.php mails $@
php /usr/src/code/app/worker.php mails $@

View file

@ -1,3 +1,3 @@
#!/bin/sh
QUEUE=v1-messaging php /usr/src/code/app/worker.php messaging $@
php /usr/src/code/app/worker.php messaging $@

View file

@ -1,3 +1,3 @@
#!/bin/sh
QUEUE=v1-webhooks php /usr/src/code/app/worker.php webhooks $@
php /usr/src/code/app/worker.php webhooks $@

View file

@ -828,18 +828,6 @@ services:
# - REDIS_HOSTS=redis
# ports:
# - "8081:8081"
# resque:
# image: appwrite/resque-web:1.1.0
# networks:
# - appwrite
# ports:
# - "5678:5678"
# environment:
# - RESQUE_WEB_HOST=redis
# - RESQUE_WEB_PORT=6379
# - RESQUE_WEB_HTTP_BASIC_AUTH_USER=user
# - RESQUE_WEB_HTTP_BASIC_AUTH_PASSWORD=password
# webgrind:
# image: 'jokkedk/webgrind:latest'
# volumes:

View file

@ -14,7 +14,12 @@ class Audit extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::AUDITS_QUEUE_NAME, Event::AUDITS_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::AUDITS_QUEUE_NAME)
->setClass(Event::BUILDS_CLASS_NAME);
}
/**

View file

@ -14,7 +14,11 @@ class Build extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::BUILDS_QUEUE_NAME, Event::BUILDS_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::BUILDS_QUEUE_NAME)
->setClass(Event::BUILDS_CLASS_NAME);
}
/**

View file

@ -13,7 +13,11 @@ class Certificate extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::CERTIFICATES_QUEUE_NAME, Event::CERTIFICATES_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::CERTIFICATES_QUEUE_NAME)
->setClass(Event::CERTIFICATES_CLASS_NAME);
}
/**

View file

@ -15,7 +15,11 @@ class Database extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::DATABASE_QUEUE_NAME, Event::DATABASE_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::DATABASE_QUEUE_NAME)
->setClass(Event::DATABASE_CLASS_NAME);
}
/**

View file

@ -17,7 +17,11 @@ class Delete extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::DELETE_QUEUE_NAME, Event::DELETE_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::DELETE_QUEUE_NAME)
->setClass(Event::DELETE_CLASS_NAME);
}
/**

View file

@ -3,8 +3,9 @@
namespace Appwrite\Event;
use InvalidArgumentException;
use Resque;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Event
{
@ -48,15 +49,10 @@ class Event
protected ?Document $user = null;
/**
* @param string $queue
* @param string $class
* @param Connection $connection
* @return void
*/
public function __construct(string $queue, string $class)
{
$this->queue = $queue;
$this->class = $class;
}
public function __construct(protected Connection $connection){}
/**
* Set queue used for this event.
@ -263,7 +259,10 @@ class Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'payload' => $this->payload,

View file

@ -16,7 +16,11 @@ class Func extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME);
parent::__construct($connection);
$this
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->setClass(Event::FUNCTIONS_CLASS_NAME);
}
/**

View file

@ -17,7 +17,11 @@ class Mail extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::MAILS_QUEUE_NAME, Event::MAILS_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::MAILS_QUEUE_NAME)
->setClass(Event::MAILS_CLASS_NAME);
}
/**

View file

@ -12,7 +12,11 @@ class Phone extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::MESSAGING_QUEUE_NAME, Event::MESSAGING_CLASS_NAME, $connection);
parent::__construct($connection);
$this
->setQueue(Event::MESSAGING_QUEUE_NAME)
->setClass(Event::MESSAGING_CLASS_NAME);
}
/**

View file

@ -13,7 +13,11 @@ class Usage extends Event
public function __construct(protected Connection $connection)
{
parent::__construct(Event::USAGE_QUEUE_NAME, Event::USAGE_CLASS_NAME);
parent::__construct($connection);
$this
->setQueue(Event::USAGE_QUEUE_NAME)
->setClass(Event::USAGE_CLASS_NAME);
}
/**

View file

@ -17,6 +17,9 @@ class Audits extends Action
return 'audits';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
@ -28,12 +31,14 @@ class Audits extends Action
}
/**
* @throws Exception
*/
public function action(Message $message, $dbForProject): void
{
$payload = $message->getPayload() ?? [];
var_dump('audits worker');
if (empty($payload)) {
throw new Exception('Missing payload');
}

View file

@ -19,6 +19,9 @@ class Mails extends Action
return 'mails';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
@ -28,11 +31,15 @@ class Mails extends Action
->callback(fn($message, $register) => $this->action($message, $register));
}
/**
* @throws \PHPMailer\PHPMailer\Exception
* @throws Exception
*/
public function action(Message $message, $register): void
{
$payload = $message->getPayload() ?? [];
var_dump('mails worker');
if (empty($payload)) {
throw new Exception('Missing payload');
}

View file

@ -28,6 +28,9 @@ class Messaging extends Action
return 'messaging';
}
/**
* @throws Exception
*/
public function __construct()
{
$this->provider = App::getEnv('_APP_SMS_PROVIDER', '');
@ -43,9 +46,11 @@ class Messaging extends Action
->callback(fn($message) => $this->action($message));
}
/**
* @throws Exception
*/
public function action(Message $message): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {

View file

@ -25,10 +25,13 @@ class Webhooks extends Action
->callback(fn($message) => $this->action($message));
}
/**
* @throws Exception
*/
public function action(Message $message): void
{
$payload = $message->getPayload() ?? [];
var_dump('webhooks');
var_dump('webhooks action');
if (empty($payload)) {
throw new Exception('Missing payload');