Merge pull request #8243 from appwrite/feat-scheduled-executions
Feat: Scheduled executions (execute later)
This commit is contained in:
commit
b3ff29dca5
14 changed files with 362 additions and 25 deletions
|
@ -79,6 +79,7 @@ RUN chmod +x /usr/local/bin/doctor && \
|
||||||
chmod +x /usr/local/bin/migrate && \
|
chmod +x /usr/local/bin/migrate && \
|
||||||
chmod +x /usr/local/bin/realtime && \
|
chmod +x /usr/local/bin/realtime && \
|
||||||
chmod +x /usr/local/bin/schedule-functions && \
|
chmod +x /usr/local/bin/schedule-functions && \
|
||||||
|
chmod +x /usr/local/bin/schedule-executions && \
|
||||||
chmod +x /usr/local/bin/schedule-messages && \
|
chmod +x /usr/local/bin/schedule-messages && \
|
||||||
chmod +x /usr/local/bin/sdks && \
|
chmod +x /usr/local/bin/sdks && \
|
||||||
chmod +x /usr/local/bin/specs && \
|
chmod +x /usr/local/bin/specs && \
|
||||||
|
|
|
@ -4550,6 +4550,17 @@ $consoleCollections = array_merge([
|
||||||
'array' => false,
|
'array' => false,
|
||||||
'filters' => [],
|
'filters' => [],
|
||||||
],
|
],
|
||||||
|
[
|
||||||
|
'$id' => ID::custom('data'),
|
||||||
|
'type' => Database::VAR_STRING,
|
||||||
|
'format' => '',
|
||||||
|
'size' => 65535,
|
||||||
|
'signed' => true,
|
||||||
|
'required' => false,
|
||||||
|
'default' => new \stdClass(),
|
||||||
|
'array' => false,
|
||||||
|
'filters' => ['json', 'encrypt'],
|
||||||
|
],
|
||||||
[
|
[
|
||||||
'$id' => ID::custom('active'),
|
'$id' => ID::custom('active'),
|
||||||
'type' => Database::VAR_BOOLEAN,
|
'type' => Database::VAR_BOOLEAN,
|
||||||
|
|
|
@ -33,6 +33,7 @@ use Utopia\Database\Helpers\Permission;
|
||||||
use Utopia\Database\Helpers\Role;
|
use Utopia\Database\Helpers\Role;
|
||||||
use Utopia\Database\Query;
|
use Utopia\Database\Query;
|
||||||
use Utopia\Database\Validator\Authorization;
|
use Utopia\Database\Validator\Authorization;
|
||||||
|
use Utopia\Database\Validator\Datetime as DatetimeValidator;
|
||||||
use Utopia\Database\Validator\Roles;
|
use Utopia\Database\Validator\Roles;
|
||||||
use Utopia\Database\Validator\UID;
|
use Utopia\Database\Validator\UID;
|
||||||
use Utopia\Storage\Device;
|
use Utopia\Storage\Device;
|
||||||
|
@ -1591,16 +1592,21 @@ App::post('/v1/functions/:functionId/executions')
|
||||||
->param('path', '/', new Text(2048), 'HTTP path of execution. Path can include query params. Default value is /', true)
|
->param('path', '/', new Text(2048), 'HTTP path of execution. Path can include query params. Default value is /', true)
|
||||||
->param('method', 'POST', new Whitelist(['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], true), 'HTTP method of execution. Default value is GET.', true)
|
->param('method', 'POST', new Whitelist(['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], true), 'HTTP method of execution. Default value is GET.', true)
|
||||||
->param('headers', [], new Assoc(), 'HTTP headers of execution. Defaults to empty.', true)
|
->param('headers', [], new Assoc(), 'HTTP headers of execution. Defaults to empty.', true)
|
||||||
|
->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled execution time in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true)
|
||||||
->inject('response')
|
->inject('response')
|
||||||
->inject('project')
|
->inject('project')
|
||||||
->inject('dbForProject')
|
->inject('dbForProject')
|
||||||
|
->inject('dbForConsole')
|
||||||
->inject('user')
|
->inject('user')
|
||||||
->inject('queueForEvents')
|
->inject('queueForEvents')
|
||||||
->inject('queueForUsage')
|
->inject('queueForUsage')
|
||||||
->inject('mode')
|
|
||||||
->inject('queueForFunctions')
|
->inject('queueForFunctions')
|
||||||
->inject('geodb')
|
->inject('geodb')
|
||||||
->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, Response $response, Document $project, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode, Func $queueForFunctions, Reader $geodb) {
|
->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, ?string $scheduledAt, Response $response, Document $project, Database $dbForProject, Database $dbForConsole, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) {
|
||||||
|
|
||||||
|
if(!$async && !is_null($scheduledAt)) {
|
||||||
|
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Scheduled executions must run asynchronously. Set scheduledAt to a future date, or set async to true.');
|
||||||
|
}
|
||||||
|
|
||||||
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
|
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
|
||||||
|
|
||||||
|
@ -1705,6 +1711,12 @@ App::post('/v1/functions/:functionId/executions')
|
||||||
|
|
||||||
$executionId = ID::unique();
|
$executionId = ID::unique();
|
||||||
|
|
||||||
|
$status = $async ? 'waiting' : 'processing';
|
||||||
|
|
||||||
|
if(!is_null($scheduledAt)) {
|
||||||
|
$status = 'scheduled';
|
||||||
|
}
|
||||||
|
|
||||||
$execution = new Document([
|
$execution = new Document([
|
||||||
'$id' => $executionId,
|
'$id' => $executionId,
|
||||||
'$permissions' => !$user->isEmpty() ? [Permission::read(Role::user($user->getId()))] : [],
|
'$permissions' => !$user->isEmpty() ? [Permission::read(Role::user($user->getId()))] : [],
|
||||||
|
@ -1712,8 +1724,8 @@ App::post('/v1/functions/:functionId/executions')
|
||||||
'functionId' => $function->getId(),
|
'functionId' => $function->getId(),
|
||||||
'deploymentInternalId' => $deployment->getInternalId(),
|
'deploymentInternalId' => $deployment->getInternalId(),
|
||||||
'deploymentId' => $deployment->getId(),
|
'deploymentId' => $deployment->getId(),
|
||||||
'trigger' => 'http', // http / schedule / event
|
'trigger' => (!is_null($scheduledAt)) ? 'schedule' : 'http',
|
||||||
'status' => $async ? 'waiting' : 'processing', // waiting / processing / completed / failed
|
'status' => $status, // waiting / processing / completed / failed
|
||||||
'responseStatusCode' => 0,
|
'responseStatusCode' => 0,
|
||||||
'responseHeaders' => [],
|
'responseHeaders' => [],
|
||||||
'requestPath' => $path,
|
'requestPath' => $path,
|
||||||
|
@ -1736,20 +1748,42 @@ App::post('/v1/functions/:functionId/executions')
|
||||||
$execution = Authorization::skip(fn () => $dbForProject->createDocument('executions', $execution));
|
$execution = Authorization::skip(fn () => $dbForProject->createDocument('executions', $execution));
|
||||||
}
|
}
|
||||||
|
|
||||||
$queueForFunctions
|
if(is_null($scheduledAt)) {
|
||||||
->setType('http')
|
$queueForFunctions
|
||||||
->setExecution($execution)
|
->setType('http')
|
||||||
->setFunction($function)
|
->setExecution($execution)
|
||||||
->setBody($body)
|
->setFunction($function)
|
||||||
->setHeaders($headers)
|
->setBody($body)
|
||||||
->setPath($path)
|
->setHeaders($headers)
|
||||||
->setMethod($method)
|
->setPath($path)
|
||||||
->setJWT($jwt)
|
->setMethod($method)
|
||||||
->setProject($project)
|
->setJWT($jwt)
|
||||||
->setUser($user)
|
->setProject($project)
|
||||||
->setParam('functionId', $function->getId())
|
->setUser($user)
|
||||||
->setParam('executionId', $execution->getId())
|
->setParam('functionId', $function->getId())
|
||||||
->trigger();
|
->setParam('executionId', $execution->getId())
|
||||||
|
->trigger();
|
||||||
|
} else {
|
||||||
|
$data = [
|
||||||
|
'headers' => $headers,
|
||||||
|
'path' => $path,
|
||||||
|
'method' => $method,
|
||||||
|
'body' => $body,
|
||||||
|
'jwt' => $jwt,
|
||||||
|
];
|
||||||
|
|
||||||
|
$dbForConsole->createDocument('schedules', new Document([
|
||||||
|
'region' => System::getEnv('_APP_REGION', 'default'),
|
||||||
|
'resourceType' => 'execution',
|
||||||
|
'resourceId' => $execution->getId(),
|
||||||
|
'resourceInternalId' => $execution->getInternalId(),
|
||||||
|
'resourceUpdatedAt' => DateTime::now(),
|
||||||
|
'projectId' => $project->getId(),
|
||||||
|
'schedule' => $scheduledAt,
|
||||||
|
'data' => $data,
|
||||||
|
'active' => true,
|
||||||
|
]));
|
||||||
|
}
|
||||||
|
|
||||||
return $response
|
return $response
|
||||||
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
|
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
|
||||||
|
|
|
@ -2697,7 +2697,7 @@ App::post('/v1/messaging/messages/email')
|
||||||
'resourceInternalId' => $message->getInternalId(),
|
'resourceInternalId' => $message->getInternalId(),
|
||||||
'resourceUpdatedAt' => DateTime::now(),
|
'resourceUpdatedAt' => DateTime::now(),
|
||||||
'projectId' => $project->getId(),
|
'projectId' => $project->getId(),
|
||||||
'schedule' => $scheduledAt,
|
'schedule' => $scheduledAt,
|
||||||
'active' => true,
|
'active' => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
|
@ -2813,7 +2813,7 @@ App::post('/v1/messaging/messages/sms')
|
||||||
'resourceInternalId' => $message->getInternalId(),
|
'resourceInternalId' => $message->getInternalId(),
|
||||||
'resourceUpdatedAt' => DateTime::now(),
|
'resourceUpdatedAt' => DateTime::now(),
|
||||||
'projectId' => $project->getId(),
|
'projectId' => $project->getId(),
|
||||||
'schedule' => $scheduledAt,
|
'schedule' => $scheduledAt,
|
||||||
'active' => true,
|
'active' => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
|
@ -2989,7 +2989,7 @@ App::post('/v1/messaging/messages/push')
|
||||||
'resourceInternalId' => $message->getInternalId(),
|
'resourceInternalId' => $message->getInternalId(),
|
||||||
'resourceUpdatedAt' => DateTime::now(),
|
'resourceUpdatedAt' => DateTime::now(),
|
||||||
'projectId' => $project->getId(),
|
'projectId' => $project->getId(),
|
||||||
'schedule' => $scheduledAt,
|
'schedule' => $scheduledAt,
|
||||||
'active' => true,
|
'active' => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
|
|
|
@ -698,6 +698,31 @@ $image = $this->getParam('image', '');
|
||||||
- _APP_DB_USER
|
- _APP_DB_USER
|
||||||
- _APP_DB_PASS
|
- _APP_DB_PASS
|
||||||
|
|
||||||
|
appwrite-task-scheduler-executions:
|
||||||
|
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
|
||||||
|
entrypoint: schedule-executions
|
||||||
|
container_name: appwrite-task-scheduler-executions
|
||||||
|
<<: *x-logging
|
||||||
|
restart: unless-stopped
|
||||||
|
networks:
|
||||||
|
- appwrite
|
||||||
|
depends_on:
|
||||||
|
- mariadb
|
||||||
|
- redis
|
||||||
|
environment:
|
||||||
|
- _APP_ENV
|
||||||
|
- _APP_WORKER_PER_CORE
|
||||||
|
- _APP_OPENSSL_KEY_V1
|
||||||
|
- _APP_REDIS_HOST
|
||||||
|
- _APP_REDIS_PORT
|
||||||
|
- _APP_REDIS_USER
|
||||||
|
- _APP_REDIS_PASS
|
||||||
|
- _APP_DB_HOST
|
||||||
|
- _APP_DB_PORT
|
||||||
|
- _APP_DB_SCHEMA
|
||||||
|
- _APP_DB_USER
|
||||||
|
- _APP_DB_PASS
|
||||||
|
|
||||||
appwrite-task-scheduler-messages:
|
appwrite-task-scheduler-messages:
|
||||||
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
|
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
|
||||||
entrypoint: schedule-messages
|
entrypoint: schedule-messages
|
||||||
|
|
3
bin/schedule-executions
Normal file
3
bin/schedule-executions
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
php /usr/src/code/app/cli.php schedule-executions $@
|
|
@ -782,6 +782,33 @@ services:
|
||||||
- _APP_DB_PASS
|
- _APP_DB_PASS
|
||||||
- _APP_DATABASE_SHARED_TABLES
|
- _APP_DATABASE_SHARED_TABLES
|
||||||
|
|
||||||
|
appwrite-task-scheduler-executions:
|
||||||
|
entrypoint: schedule-executions
|
||||||
|
<<: *x-logging
|
||||||
|
container_name: appwrite-task-scheduler-executions
|
||||||
|
image: appwrite-dev
|
||||||
|
networks:
|
||||||
|
- appwrite
|
||||||
|
volumes:
|
||||||
|
- ./app:/usr/src/code/app
|
||||||
|
- ./src:/usr/src/code/src
|
||||||
|
depends_on:
|
||||||
|
- mariadb
|
||||||
|
- redis
|
||||||
|
environment:
|
||||||
|
- _APP_ENV
|
||||||
|
- _APP_WORKER_PER_CORE
|
||||||
|
- _APP_OPENSSL_KEY_V1
|
||||||
|
- _APP_REDIS_HOST
|
||||||
|
- _APP_REDIS_PORT
|
||||||
|
- _APP_REDIS_USER
|
||||||
|
- _APP_REDIS_PASS
|
||||||
|
- _APP_DB_HOST
|
||||||
|
- _APP_DB_PORT
|
||||||
|
- _APP_DB_SCHEMA
|
||||||
|
- _APP_DB_USER
|
||||||
|
- _APP_DB_PASS
|
||||||
|
|
||||||
appwrite-task-scheduler-messages:
|
appwrite-task-scheduler-messages:
|
||||||
entrypoint: schedule-messages
|
entrypoint: schedule-messages
|
||||||
<<: *x-logging
|
<<: *x-logging
|
||||||
|
|
|
@ -14,6 +14,7 @@ class Func extends Event
|
||||||
protected string $path = '';
|
protected string $path = '';
|
||||||
protected string $method = '';
|
protected string $method = '';
|
||||||
protected array $headers = [];
|
protected array $headers = [];
|
||||||
|
protected ?string $functionId = null;
|
||||||
protected ?Document $function = null;
|
protected ?Document $function = null;
|
||||||
protected ?Document $execution = null;
|
protected ?Document $execution = null;
|
||||||
|
|
||||||
|
@ -49,6 +50,28 @@ class Func extends Event
|
||||||
return $this->function;
|
return $this->function;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets function id for the function event.
|
||||||
|
*
|
||||||
|
* @param string $functionId
|
||||||
|
*/
|
||||||
|
public function setFunctionId(string $functionId): self
|
||||||
|
{
|
||||||
|
$this->functionId = $functionId;
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns set function id for the function event.
|
||||||
|
*
|
||||||
|
* @return string|null
|
||||||
|
*/
|
||||||
|
public function getFunctionId(): ?string
|
||||||
|
{
|
||||||
|
return $this->functionId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets execution for the function event.
|
* Sets execution for the function event.
|
||||||
*
|
*
|
||||||
|
@ -200,6 +223,7 @@ class Func extends Event
|
||||||
'project' => $this->project,
|
'project' => $this->project,
|
||||||
'user' => $this->user,
|
'user' => $this->user,
|
||||||
'function' => $this->function,
|
'function' => $this->function,
|
||||||
|
'functionId' => $this->functionId,
|
||||||
'execution' => $this->execution,
|
'execution' => $this->execution,
|
||||||
'type' => $this->type,
|
'type' => $this->type,
|
||||||
'jwt' => $this->jwt,
|
'jwt' => $this->jwt,
|
||||||
|
|
|
@ -8,6 +8,7 @@ use Appwrite\Platform\Tasks\Maintenance;
|
||||||
use Appwrite\Platform\Tasks\Migrate;
|
use Appwrite\Platform\Tasks\Migrate;
|
||||||
use Appwrite\Platform\Tasks\QueueCount;
|
use Appwrite\Platform\Tasks\QueueCount;
|
||||||
use Appwrite\Platform\Tasks\QueueRetry;
|
use Appwrite\Platform\Tasks\QueueRetry;
|
||||||
|
use Appwrite\Platform\Tasks\ScheduleExecutions;
|
||||||
use Appwrite\Platform\Tasks\ScheduleFunctions;
|
use Appwrite\Platform\Tasks\ScheduleFunctions;
|
||||||
use Appwrite\Platform\Tasks\ScheduleMessages;
|
use Appwrite\Platform\Tasks\ScheduleMessages;
|
||||||
use Appwrite\Platform\Tasks\SDKs;
|
use Appwrite\Platform\Tasks\SDKs;
|
||||||
|
@ -33,6 +34,7 @@ class Tasks extends Service
|
||||||
->addAction(SDKs::getName(), new SDKs())
|
->addAction(SDKs::getName(), new SDKs())
|
||||||
->addAction(SSL::getName(), new SSL())
|
->addAction(SSL::getName(), new SSL())
|
||||||
->addAction(ScheduleFunctions::getName(), new ScheduleFunctions())
|
->addAction(ScheduleFunctions::getName(), new ScheduleFunctions())
|
||||||
|
->addAction(ScheduleExecutions::getName(), new ScheduleExecutions())
|
||||||
->addAction(ScheduleMessages::getName(), new ScheduleMessages())
|
->addAction(ScheduleMessages::getName(), new ScheduleMessages())
|
||||||
->addAction(Specs::getName(), new Specs())
|
->addAction(Specs::getName(), new Specs())
|
||||||
->addAction(Upgrade::getName(), new Upgrade())
|
->addAction(Upgrade::getName(), new Upgrade())
|
||||||
|
|
|
@ -64,7 +64,8 @@ abstract class ScheduleBase extends Action
|
||||||
|
|
||||||
$collectionId = match ($schedule->getAttribute('resourceType')) {
|
$collectionId = match ($schedule->getAttribute('resourceType')) {
|
||||||
'function' => 'functions',
|
'function' => 'functions',
|
||||||
'message' => 'messages'
|
'message' => 'messages',
|
||||||
|
'execution' => 'executions'
|
||||||
};
|
};
|
||||||
|
|
||||||
$resource = $getProjectDB($project)->getDocument(
|
$resource = $getProjectDB($project)->getDocument(
|
||||||
|
@ -113,7 +114,8 @@ abstract class ScheduleBase extends Action
|
||||||
} catch (\Throwable $th) {
|
} catch (\Throwable $th) {
|
||||||
$collectionId = match ($document->getAttribute('resourceType')) {
|
$collectionId = match ($document->getAttribute('resourceType')) {
|
||||||
'function' => 'functions',
|
'function' => 'functions',
|
||||||
'message' => 'messages'
|
'message' => 'messages',
|
||||||
|
'execution' => 'executions'
|
||||||
};
|
};
|
||||||
|
|
||||||
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
|
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
|
||||||
|
|
71
src/Appwrite/Platform/Tasks/ScheduleExecutions.php
Normal file
71
src/Appwrite/Platform/Tasks/ScheduleExecutions.php
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
|
use Appwrite\Event\Func;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Pools\Group;
|
||||||
|
|
||||||
|
class ScheduleExecutions extends ScheduleBase
|
||||||
|
{
|
||||||
|
public const UPDATE_TIMER = 3; // seconds
|
||||||
|
public const ENQUEUE_TIMER = 4; // seconds
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'schedule-executions';
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getSupportedResource(): string
|
||||||
|
{
|
||||||
|
return 'execution';
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function enqueueResources(Group $pools, Database $dbForConsole): void
|
||||||
|
{
|
||||||
|
$queue = $pools->get('queue')->pop();
|
||||||
|
$connection = $queue->getResource();
|
||||||
|
$queueForFunctions = new Func($connection);
|
||||||
|
|
||||||
|
foreach ($this->schedules as $schedule) {
|
||||||
|
if (!$schedule['active']) {
|
||||||
|
$dbForConsole->deleteDocument(
|
||||||
|
'schedules',
|
||||||
|
$schedule['$id'],
|
||||||
|
);
|
||||||
|
|
||||||
|
unset($this->schedules[$schedule['resourceId']]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$now = new \DateTime();
|
||||||
|
$scheduledAt = new \DateTime($schedule['schedule']);
|
||||||
|
|
||||||
|
if ($scheduledAt > $now) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$queueForFunctions
|
||||||
|
->setType('schedule')
|
||||||
|
// Set functionId instead of function as we don't have $dbForProject
|
||||||
|
// TODO: Refactor to use function instead of functionId
|
||||||
|
->setFunctionId($schedule['resource']['functionId'])
|
||||||
|
->setExecution($schedule['resource'])
|
||||||
|
->setMethod($schedule['data']['method'] ?? 'POST')
|
||||||
|
->setPath($schedule['data']['path'] ?? '/')
|
||||||
|
->setHeaders($schedule['data']['headers'] ?? [])
|
||||||
|
->setBody($schedule['data']['body'] ?? '')
|
||||||
|
->setProject($schedule['project'])
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
$dbForConsole->deleteDocument(
|
||||||
|
'schedules',
|
||||||
|
$schedule['$id'],
|
||||||
|
);
|
||||||
|
|
||||||
|
unset($this->schedules[$schedule['resourceId']]);
|
||||||
|
}
|
||||||
|
|
||||||
|
$queue->reclaim();
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,7 +35,7 @@ class ScheduleMessages extends ScheduleBase
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
\go(function () use ($now, $schedule, $pools, $dbForConsole) {
|
\go(function () use ($schedule, $pools, $dbForConsole) {
|
||||||
$queue = $pools->get('queue')->pop();
|
$queue = $pools->get('queue')->pop();
|
||||||
$connection = $queue->getResource();
|
$connection = $queue->getResource();
|
||||||
$queueForMessaging = new Messaging($connection);
|
$queueForMessaging = new Messaging($connection);
|
||||||
|
|
|
@ -83,6 +83,7 @@ class Functions extends Action
|
||||||
$eventData = $payload['payload'] ?? '';
|
$eventData = $payload['payload'] ?? '';
|
||||||
$project = new Document($payload['project'] ?? []);
|
$project = new Document($payload['project'] ?? []);
|
||||||
$function = new Document($payload['function'] ?? []);
|
$function = new Document($payload['function'] ?? []);
|
||||||
|
$functionId = $payload['functionId'] ?? '';
|
||||||
$user = new Document($payload['user'] ?? []);
|
$user = new Document($payload['user'] ?? []);
|
||||||
$method = $payload['method'] ?? 'POST';
|
$method = $payload['method'] ?? 'POST';
|
||||||
$headers = $payload['headers'] ?? [];
|
$headers = $payload['headers'] ?? [];
|
||||||
|
@ -92,6 +93,10 @@ class Functions extends Action
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($function->isEmpty() && !empty($functionId)) {
|
||||||
|
$function = $dbForProject->getDocument('functions', $functionId);
|
||||||
|
}
|
||||||
|
|
||||||
$log->addTag('functionId', $function->getId());
|
$log->addTag('functionId', $function->getId());
|
||||||
$log->addTag('projectId', $project->getId());
|
$log->addTag('projectId', $project->getId());
|
||||||
$log->addTag('type', $type);
|
$log->addTag('type', $type);
|
||||||
|
@ -176,6 +181,7 @@ class Functions extends Action
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
case 'schedule':
|
case 'schedule':
|
||||||
|
$execution = new Document($payload['execution'] ?? []);
|
||||||
$this->execute(
|
$this->execute(
|
||||||
log: $log,
|
log: $log,
|
||||||
dbForProject: $dbForProject,
|
dbForProject: $dbForProject,
|
||||||
|
@ -193,7 +199,7 @@ class Functions extends Action
|
||||||
jwt: null,
|
jwt: null,
|
||||||
event: null,
|
event: null,
|
||||||
eventData: null,
|
eventData: null,
|
||||||
executionId: null,
|
executionId: $execution->getId() ?? null
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,6 +195,137 @@ class FunctionsCustomClientTest extends Scope
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testCreateScheduledExecution(): void
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Test for SUCCESS
|
||||||
|
*/
|
||||||
|
$function = $this->client->call(Client::METHOD_POST, '/functions', [
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
], [
|
||||||
|
'functionId' => ID::unique(),
|
||||||
|
'name' => 'Test',
|
||||||
|
'execute' => [Role::user($this->getUser()['$id'])->toString()],
|
||||||
|
'runtime' => 'php-8.0',
|
||||||
|
'entrypoint' => 'index.php',
|
||||||
|
'events' => [
|
||||||
|
'users.*.create',
|
||||||
|
'users.*.delete',
|
||||||
|
],
|
||||||
|
'timeout' => 10,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$this->assertEquals(201, $function['headers']['status-code']);
|
||||||
|
|
||||||
|
$folder = 'php';
|
||||||
|
$code = realpath(__DIR__ . '/../../../resources/functions') . "/$folder/code.tar.gz";
|
||||||
|
$this->packageCode($folder);
|
||||||
|
|
||||||
|
$deployment = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/deployments', [
|
||||||
|
'content-type' => 'multipart/form-data',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
], [
|
||||||
|
'entrypoint' => 'index.php',
|
||||||
|
'code' => new CURLFile($code, 'application/x-gzip', \basename($code)),
|
||||||
|
'activate' => true
|
||||||
|
]);
|
||||||
|
|
||||||
|
$deploymentId = $deployment['body']['$id'] ?? '';
|
||||||
|
|
||||||
|
$this->assertEquals(202, $deployment['headers']['status-code']);
|
||||||
|
|
||||||
|
// Poll until deployment is built
|
||||||
|
while (true) {
|
||||||
|
$deployment = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/deployments/' . $deploymentId, [
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
]);
|
||||||
|
|
||||||
|
if (
|
||||||
|
$deployment['headers']['status-code'] >= 400
|
||||||
|
|| \in_array($deployment['body']['status'], ['ready', 'failed'])
|
||||||
|
) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
\sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->assertEquals('ready', $deployment['body']['status']);
|
||||||
|
|
||||||
|
$function = $this->client->call(Client::METHOD_PATCH, '/functions/' . $function['body']['$id'] . '/deployments/' . $deploymentId, [
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
], []);
|
||||||
|
|
||||||
|
$this->assertEquals(200, $function['headers']['status-code']);
|
||||||
|
|
||||||
|
// Schedule execution for the future
|
||||||
|
\date_default_timezone_set('UTC');
|
||||||
|
$futureTime = (new \DateTime())->add(new \DateInterval('PT10S'))->format('Y-m-d H:i:s');
|
||||||
|
|
||||||
|
$execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
], $this->getHeaders()), [
|
||||||
|
'async' => true,
|
||||||
|
'scheduledAt' => $futureTime,
|
||||||
|
'path' => '/custom',
|
||||||
|
'method' => 'GET',
|
||||||
|
'body' => 'hello',
|
||||||
|
'headers' => [
|
||||||
|
'content-type' => 'application/plain',
|
||||||
|
],
|
||||||
|
]);
|
||||||
|
|
||||||
|
$this->assertEquals(202, $execution['headers']['status-code']);
|
||||||
|
$this->assertEquals('scheduled', $execution['body']['status']);
|
||||||
|
|
||||||
|
$executionId = $execution['body']['$id'];
|
||||||
|
|
||||||
|
sleep(20);
|
||||||
|
|
||||||
|
$execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
]);
|
||||||
|
|
||||||
|
$this->assertEquals(200, $execution['headers']['status-code']);
|
||||||
|
$this->assertEquals(200, $execution['body']['responseStatusCode']);
|
||||||
|
$this->assertEquals('completed', $execution['body']['status']);
|
||||||
|
$this->assertEquals('/custom', $execution['body']['requestPath']);
|
||||||
|
$this->assertEquals('GET', $execution['body']['requestMethod']);
|
||||||
|
|
||||||
|
/* Test for FAILURE */
|
||||||
|
|
||||||
|
// Schedule synchronous execution
|
||||||
|
|
||||||
|
$execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
], $this->getHeaders()), [
|
||||||
|
'async' => false,
|
||||||
|
'scheduledAt' => $futureTime,
|
||||||
|
]);
|
||||||
|
|
||||||
|
$this->assertEquals(400, $execution['headers']['status-code']);
|
||||||
|
|
||||||
|
// Cleanup : Delete function
|
||||||
|
$response = $this->client->call(Client::METHOD_DELETE, '/functions/' . $function['body']['$id'], [
|
||||||
|
'content-type' => 'application/json',
|
||||||
|
'x-appwrite-project' => $this->getProject()['$id'],
|
||||||
|
'x-appwrite-key' => $this->getProject()['apiKey'],
|
||||||
|
], []);
|
||||||
|
|
||||||
|
$this->assertEquals(204, $response['headers']['status-code']);
|
||||||
|
}
|
||||||
|
|
||||||
public function testCreateCustomExecution(): array
|
public function testCreateCustomExecution(): array
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue