1
0
Fork 0
mirror of synced 2024-06-26 18:20:43 +12:00

Refactor func event triggering

This commit is contained in:
Matej Baco 2022-11-15 19:13:17 +01:00
parent 280a44e1cd
commit 44a82de09b
11 changed files with 106 additions and 109 deletions

View file

@ -3,6 +3,7 @@
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/controllers/general.php';
use Appwrite\Event\Func;
use Appwrite\Platform\Appwrite;
use Utopia\CLI\CLI;
use Utopia\Database\Validator\Authorization;
@ -109,6 +110,10 @@ CLI::setResource('influxdb', function (Registry $register) {
return $database;
}, ['register']);
CLI::setResource('functions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -5,6 +5,7 @@ use Appwrite\Auth\Auth;
use Appwrite\Event\Build;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Validator\Event as ValidatorEvent;
use Appwrite\Extend\Exception;
use Appwrite\Utopia\Database\Validator\CustomId;
@ -41,7 +42,6 @@ use Utopia\CLI\Console;
use Utopia\Database\Validator\Roles;
use Utopia\Validator\Boolean;
use Utopia\Database\Exception\Duplicate as DuplicateException;
use Utopia\Queue\Client as QueueClient;
include_once __DIR__ . '/../shared/api.php';
@ -1061,7 +1061,8 @@ App::post('/v1/functions/:functionId/executions')
->inject('usage')
->inject('mode')
->inject('pools')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools) {
->inject('functions')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools, Func $functions) {
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
@ -1149,18 +1150,15 @@ App::post('/v1/functions/:functionId/executions')
->setContext('function', $function);
if ($async) {
$queueForFunctions = new QueueClient(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource());
$queueForFunctions->enqueue([
'type' => 'http',
'value' => [
'type' => 'http',
'execution' => $execution,
'function' => $function,
'data' => $data,
'jwt' => $jwt,
'project' => $project,
'user' => $user
]]);
$functions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setData($data)
->setJWT($jwt)
->setProject($project)
->setUser($user)
->trigger();
return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)

View file

@ -5,6 +5,7 @@ use Appwrite\Event\Audit;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
@ -251,7 +252,8 @@ App::shutdown()
->inject('database')
->inject('mode')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject) use ($parseLabel) {
->inject('functions')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $functions) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -262,9 +264,13 @@ App::shutdown()
/**
* Trigger functions.
*/
$events
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$functions
->setData(\json_encode($events->getPayload()))
->setProject($events->getProject())
->setUser($events->getUser())
->setEvent($events->getEvent())
->setParam('functionId', $events->getParam('functionId'))
->setParam('executionId', $events->getParam('executionId'))
->trigger();
/**

View file

@ -71,6 +71,7 @@ use Utopia\Pools\Group;
use Utopia\Pools\Pool;
use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Event\Func;
use MaxMind\Db\Reader;
use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Database\PDOProxy;
@ -843,6 +844,9 @@ App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete());
App::setResource('database', fn() => new EventDatabase());
App::setResource('messaging', fn() => new Phone());
App::setResource('functions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
App::setResource('usage', function ($register) {
return new Stats($register->get('statsd'));
}, ['register']);
@ -1023,13 +1027,12 @@ App::setResource('console', function () {
}, []);
App::setResource('queue', function () {
$fallbackForRedis = AppwriteURL::unparse([
'scheme' => 'redis',
'host' => App::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => App::getEnv('_APP_REDIS_PORT', '6379'),
'user' => App::getEnv('_APP_REDIS_USER', ''),
'pass' => App::getEnv('_APP_REDIS_PASS', ''),
'scheme' => 'redis',
'host' => App::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => App::getEnv('_APP_REDIS_PORT', '6379'),
'user' => App::getEnv('_APP_REDIS_USER', ''),
'pass' => App::getEnv('_APP_REDIS_PASS', ''),
]);
$connection = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis);

View file

@ -2,6 +2,7 @@
require_once __DIR__ . '/init.php';
use Appwrite\Event\Func;
use Swoole\Runtime;
use Utopia\App;
use Utopia\Cache\Adapter\Sharding;
@ -68,7 +69,12 @@ Server::setResource('cache', function (Registry $register) {
return new Cache(new Sharding($adapters));
}, ['register']);
App::setResource('logger', function ($register) {
Server::setResource('functions', function (Registry $register) {
$pools = $register->get('pools');
return new Func($pools->get('queue')->pop()->getResource());
}, ['register']);
Server::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);

View file

@ -1,6 +1,7 @@
<?php
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Utopia\Response\Model\Deployment;
@ -105,6 +106,8 @@ class BuildsV1 extends Worker
$build->setAttribute('status', 'building');
$build = $dbForProject->updateDocument('builds', $buildId, $build);
$data = $deployment->getArrayCopy(array_keys($deploymentModel->getRules()));
/** Trigger Webhook */
$deploymentModel = new Deployment();
@ -114,14 +117,22 @@ class BuildsV1 extends Worker
->setEvent('functions.[functionId].deployments.[deploymentId].update')
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules())))
->setPayload($data)
->trigger();
/** Trigger Functions */
$deploymentUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
global $register;
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop();
$functions = new Func($connection->getResource());
$functions
->setData(\json_encode($data))
->setProject($project)
->setEvent('functions.[functionId].deployments.[deploymentId].update')
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->trigger();
$connection->reclaim();
/** Trigger Realtime */
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [

View file

@ -5,6 +5,7 @@ require_once __DIR__ . '/../worker.php';
use Utopia\Queue;
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\Response\Model\Execution;
@ -33,6 +34,7 @@ $execute = function (
Document $project,
Document $function,
Database $dbForProject,
Func $functions,
string $trigger,
string $executionId = null,
string $event = null,
@ -206,9 +208,13 @@ $execute = function (
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$functions
->setData($data)
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->trigger();
/** Trigger realtime event */
@ -259,9 +265,10 @@ $server = new Queue\Server($adapter);
$server->job()
->inject('message')
->inject('dbForProject')
->action(function (Message $message, Database $dbForProject) use ($execute) {
$args = $message->getPayload()['value'] ?? [];
$type = $message->getPayload()['type'] ?? '';
->inject('functions')
->action(function (Message $message, Database $dbForProject, Func $functions) use ($execute) {
$args = $message->getPayload() ?? [];
$type = $args['type'] ?? '';
$events = $args['events'] ?? [];
$project = new Document($args['project'] ?? []);
$user = new Document($args['user'] ?? []);
@ -317,34 +324,20 @@ $server->job()
/**
* Handle Schedule and HTTP execution.
*/
$user = new Document($args['user'] ?? []);
$project = new Document($args['project'] ?? []);
$execution = new Document($args['execution'] ?? []);
$function = new Document($args['function'] ?? []);
switch ($type) {
case 'http':
$jwt = $args['jwt'] ?? '';
$data = $args['data'] ?? '';
$execution = new Document($args['execution'] ?? []);
$user = new Document($args['user'] ?? []);
$function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId'));
call_user_func($execute, $project, $function, $dbForProject, 'http', $execution->getId(), null, null, $data, $user, $jwt);
call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt);
break;
case 'schedule':
/*
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
call_user_func($execute, $project, $function, $dbForProject, 'schedule', null, null, null, null, null, null);
call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null);
break;
}
});

View file

@ -53,7 +53,7 @@
"utopia-php/domains": "1.1.*",
"utopia-php/framework": "0.25.*",
"utopia-php/image": "0.5.*",
"utopia-php/queue": "dev-upgrade-libs as 0.4.1",
"utopia-php/queue": "0.4.*",
"utopia-php/locale": "0.4.*",
"utopia-php/logger": "0.3.*",
"utopia-php/orchestration": "0.9.*",

21
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "3e24f0f02ec826898d50e4119f9eb226",
"content-hash": "f2faa670abaa356f9b14e4f1c3542b33",
"packages": [
{
"name": "adhocore/jwt",
@ -2359,16 +2359,16 @@
},
{
"name": "utopia-php/queue",
"version": "dev-upgrade-libs",
"version": "0.4.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "310aaac74d2287d3d9450a532658247cdfe5e72c"
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/310aaac74d2287d3d9450a532658247cdfe5e72c",
"reference": "310aaac74d2287d3d9450a532658247cdfe5e72c",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"shasum": ""
},
"require": {
@ -2414,9 +2414,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/upgrade-libs"
"source": "https://github.com/utopia-php/queue/tree/0.4.1"
},
"time": "2022-11-15T16:35:56+00:00"
"time": "2022-11-15T16:56:37+00:00"
},
{
"name": "utopia-php/registry",
@ -5277,12 +5277,6 @@
}
],
"aliases": [
{
"package": "utopia-php/queue",
"version": "dev-upgrade-libs",
"alias": "0.4.1",
"alias_normalized": "0.4.1.0"
},
{
"package": "utopia-php/registry",
"version": "dev-feat-allow-params",
@ -5292,7 +5286,6 @@
],
"minimum-stability": "stable",
"stability-flags": {
"utopia-php/queue": 20,
"utopia-php/registry": 20
},
"prefer-stable": false,

View file

@ -6,6 +6,8 @@ use DateTime;
use Resque;
use ResqueScheduler;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Func extends Event
{
@ -15,7 +17,7 @@ class Func extends Event
protected ?Document $function = null;
protected ?Document $execution = null;
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME);
}
@ -143,36 +145,16 @@ class Func extends Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
'execution' => $this->execution,
'type' => $this->type,
'jwt' => $this->jwt,
'payload' => $this->payload,
'data' => $this->data
]);
}
$queue = new Client(Event::FUNCTIONS_QUEUE_NAME, $this->connection);
/**
* Schedules the function event and schedules it in the functions worker queue.
*
* @param \DateTime|int $at
* @return void
* @throws \Resque_Exception
* @throws \ResqueScheduler_InvalidTimestampException
*/
public function schedule(DateTime|int $at): void
{
ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
'execution' => $this->execution,
return $queue->enqueue([
'type' => $this->type,
'payload' => $this->payload,
'data' => $this->data
'execution' => $this->execution,
'function' => $this->function,
'data' => $this->data,
'jwt' => $this->jwt,
'project' => $this->project,
'user' => $this->user
]);
}
}

View file

@ -14,6 +14,7 @@ use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Client as Worker;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use function Swoole\Coroutine\run;
@ -203,6 +204,9 @@ class Schedule extends Action
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!isset($schedules[$scheduleKey])) {
@ -211,20 +215,16 @@ class Schedule extends Action
$schedule = $schedules[$scheduleKey];
$queue = $pools->get('queue')->pop();
$functions = new Func($connection);
$worker = new Worker(Event::FUNCTIONS_QUEUE_NAME, $queue->getResource());
$worker
->enqueue([
'type' => 'schedule',
'value' => [
'project' => $schedule['project'],
'function' => $schedule['function'],
]
]);
$queue->reclaim();
$functions
->setType('schedule')
->setFunction($schedule['function'])
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}