1
0
Fork 0
mirror of synced 2024-10-05 20:53:27 +13:00

WIP: Schedulded executions

This commit is contained in:
Matej Bačo 2024-06-07 19:05:29 +00:00
parent 3e47b43908
commit b8b81a9bd1
3 changed files with 46 additions and 21 deletions

View file

@ -32,6 +32,7 @@ use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Validator\Datetime as DatetimeValidator;
use Utopia\Database\Validator\Roles;
use Utopia\Database\Validator\UID;
use Utopia\Storage\Device;
@ -1511,16 +1512,21 @@ App::post('/v1/functions/:functionId/executions')
->param('path', '/', new Text(2048), 'HTTP path of execution. Path can include query params. Default value is /', true)
->param('method', 'POST', new Whitelist(['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], true), 'HTTP method of execution. Default value is GET.', true)
->param('headers', [], new Assoc(), 'HTTP headers of execution. Defaults to empty.', true)
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled execution time in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
->inject('response')
->inject('project')
->inject('dbForProject')
->inject('dbForConsole')
->inject('user')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('mode')
->inject('queueForFunctions')
->inject('geodb')
->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, Response $response, Document $project, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode, Func $queueForFunctions, Reader $geodb) {
->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, ?string $scheduledAt, Response $response, Document $project, Database $dbForProject, Database $dbForConsole, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) {
if(!$async && !is_null($scheduledAt)) {
throw new Exception(Exception::GENERAL_QUERY_INVALID, 'Scheduled executions must run asynchronously. Don\'t set scheduledAt to execute immediately, or set async to true.');
}
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
@ -1625,6 +1631,12 @@ App::post('/v1/functions/:functionId/executions')
$executionId = ID::unique();
$status = $async ? 'waiting' : 'processing';
if(!is_null($scheduledAt)) {
$status = 'scheduled';
}
$execution = new Document([
'$id' => $executionId,
'$permissions' => !$user->isEmpty() ? [Permission::read(Role::user($user->getId()))] : [],
@ -1633,7 +1645,7 @@ App::post('/v1/functions/:functionId/executions')
'deploymentInternalId' => $deployment->getInternalId(),
'deploymentId' => $deployment->getId(),
'trigger' => 'http', // http / schedule / event
'status' => $async ? 'waiting' : 'processing', // waiting / processing / completed / failed
'status' => $status, // waiting / processing / completed / failed
'responseStatusCode' => 0,
'responseHeaders' => [],
'requestPath' => $path,
@ -1656,20 +1668,33 @@ App::post('/v1/functions/:functionId/executions')
$execution = Authorization::skip(fn () => $dbForProject->createDocument('executions', $execution));
}
$queueForFunctions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setBody($body)
->setHeaders($headers)
->setPath($path)
->setMethod($method)
->setJWT($jwt)
->setProject($project)
->setUser($user)
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->trigger();
if(is_null($scheduledAt)) {
$queueForFunctions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setBody($body)
->setHeaders($headers)
->setPath($path)
->setMethod($method)
->setJWT($jwt)
->setProject($project)
->setUser($user)
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->trigger();
} else {
$dbForConsole->createDocument('schedules', new Document([
'region' => System::getEnv('_APP_REGION', 'default'),
'resourceType' => 'function',
'resourceId' => $function->getId(),
'resourceInternalId' => $function->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'active' => true,
]));
}
return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)

View file

@ -2697,7 +2697,7 @@ App::post('/v1/messaging/messages/email')
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'schedule' => $scheduledAt,
'active' => true,
]));
@ -2813,7 +2813,7 @@ App::post('/v1/messaging/messages/sms')
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'schedule' => $scheduledAt,
'active' => true,
]));
@ -2989,7 +2989,7 @@ App::post('/v1/messaging/messages/push')
'resourceInternalId' => $message->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,
'schedule' => $scheduledAt,
'active' => true,
]));

View file

@ -41,7 +41,7 @@ class ScheduleFunctions extends ScheduleBase
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']);
$cron = new CronExpression($schedule['schedule']); // TODO: Allow schedule to be DateTime, like ScheduleMessaging.php
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);