diff --git a/Dockerfile b/Dockerfile index 1d82930c1d..56c22fe679 100755 --- a/Dockerfile +++ b/Dockerfile @@ -79,6 +79,7 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/migrate && \ chmod +x /usr/local/bin/realtime && \ chmod +x /usr/local/bin/schedule-functions && \ + chmod +x /usr/local/bin/schedule-executions && \ chmod +x /usr/local/bin/schedule-messages && \ chmod +x /usr/local/bin/sdks && \ chmod +x /usr/local/bin/specs && \ diff --git a/app/config/collections.php b/app/config/collections.php index 66bb2606cc..1b7036c587 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -4550,6 +4550,17 @@ $consoleCollections = array_merge([ 'array' => false, 'filters' => [], ], + [ + '$id' => ID::custom('data'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 65535, + 'signed' => true, + 'required' => false, + 'default' => new \stdClass(), + 'array' => false, + 'filters' => ['json', 'encrypt'], + ], [ '$id' => ID::custom('active'), 'type' => Database::VAR_BOOLEAN, diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 491dea6b80..d5a0fced68 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -33,6 +33,7 @@ use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; +use Utopia\Database\Validator\Datetime as DatetimeValidator; use Utopia\Database\Validator\Roles; use Utopia\Database\Validator\UID; use Utopia\Storage\Device; @@ -1591,16 +1592,21 @@ App::post('/v1/functions/:functionId/executions') ->param('path', '/', new Text(2048), 'HTTP path of execution. Path can include query params. Default value is /', true) ->param('method', 'POST', new Whitelist(['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], true), 'HTTP method of execution. Default value is GET.', true) ->param('headers', [], new Assoc(), 'HTTP headers of execution. Defaults to empty.', true) + ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled execution time in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('response') ->inject('project') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('user') ->inject('queueForEvents') ->inject('queueForUsage') - ->inject('mode') ->inject('queueForFunctions') ->inject('geodb') - ->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, Response $response, Document $project, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode, Func $queueForFunctions, Reader $geodb) { + ->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, ?string $scheduledAt, Response $response, Document $project, Database $dbForProject, Database $dbForConsole, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) { + + if(!$async && !is_null($scheduledAt)) { + throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Scheduled executions must run asynchronously. Set scheduledAt to a future date, or set async to true.'); + } $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1705,6 +1711,12 @@ App::post('/v1/functions/:functionId/executions') $executionId = ID::unique(); + $status = $async ? 'waiting' : 'processing'; + + if(!is_null($scheduledAt)) { + $status = 'scheduled'; + } + $execution = new Document([ '$id' => $executionId, '$permissions' => !$user->isEmpty() ? [Permission::read(Role::user($user->getId()))] : [], @@ -1712,8 +1724,8 @@ App::post('/v1/functions/:functionId/executions') 'functionId' => $function->getId(), 'deploymentInternalId' => $deployment->getInternalId(), 'deploymentId' => $deployment->getId(), - 'trigger' => 'http', // http / schedule / event - 'status' => $async ? 'waiting' : 'processing', // waiting / processing / completed / failed + 'trigger' => (!is_null($scheduledAt)) ? 'schedule' : 'http', + 'status' => $status, // waiting / processing / completed / failed 'responseStatusCode' => 0, 'responseHeaders' => [], 'requestPath' => $path, @@ -1736,20 +1748,42 @@ App::post('/v1/functions/:functionId/executions') $execution = Authorization::skip(fn () => $dbForProject->createDocument('executions', $execution)); } - $queueForFunctions - ->setType('http') - ->setExecution($execution) - ->setFunction($function) - ->setBody($body) - ->setHeaders($headers) - ->setPath($path) - ->setMethod($method) - ->setJWT($jwt) - ->setProject($project) - ->setUser($user) - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->trigger(); + if(is_null($scheduledAt)) { + $queueForFunctions + ->setType('http') + ->setExecution($execution) + ->setFunction($function) + ->setBody($body) + ->setHeaders($headers) + ->setPath($path) + ->setMethod($method) + ->setJWT($jwt) + ->setProject($project) + ->setUser($user) + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->trigger(); + } else { + $data = [ + 'headers' => $headers, + 'path' => $path, + 'method' => $method, + 'body' => $body, + 'jwt' => $jwt, + ]; + + $dbForConsole->createDocument('schedules', new Document([ + 'region' => System::getEnv('_APP_REGION', 'default'), + 'resourceType' => 'execution', + 'resourceId' => $execution->getId(), + 'resourceInternalId' => $execution->getInternalId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $scheduledAt, + 'data' => $data, + 'active' => true, + ])); + } return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index e3696cc7e0..7da0348a8f 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -2697,7 +2697,7 @@ App::post('/v1/messaging/messages/email') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); @@ -2813,7 +2813,7 @@ App::post('/v1/messaging/messages/sms') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); @@ -2989,7 +2989,7 @@ App::post('/v1/messaging/messages/push') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); diff --git a/app/views/install/compose.phtml b/app/views/install/compose.phtml index a6e3521e52..62e889e1ee 100644 --- a/app/views/install/compose.phtml +++ b/app/views/install/compose.phtml @@ -698,6 +698,31 @@ $image = $this->getParam('image', ''); - _APP_DB_USER - _APP_DB_PASS + appwrite-task-scheduler-executions: + image: /: + entrypoint: schedule-executions + container_name: appwrite-task-scheduler-executions + <<: *x-logging + restart: unless-stopped + networks: + - appwrite + depends_on: + - mariadb + - redis + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + appwrite-task-scheduler-messages: image: /: entrypoint: schedule-messages diff --git a/bin/schedule-executions b/bin/schedule-executions new file mode 100644 index 0000000000..f239cad206 --- /dev/null +++ b/bin/schedule-executions @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/cli.php schedule-executions $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d5d27ce461..b68584d685 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -782,6 +782,33 @@ services: - _APP_DB_PASS - _APP_DATABASE_SHARED_TABLES + appwrite-task-scheduler-executions: + entrypoint: schedule-executions + <<: *x-logging + container_name: appwrite-task-scheduler-executions + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - mariadb + - redis + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + appwrite-task-scheduler-messages: entrypoint: schedule-messages <<: *x-logging diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 11c9e980ed..451df2b6c1 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -14,6 +14,7 @@ class Func extends Event protected string $path = ''; protected string $method = ''; protected array $headers = []; + protected ?string $functionId = null; protected ?Document $function = null; protected ?Document $execution = null; @@ -49,6 +50,28 @@ class Func extends Event return $this->function; } + /** + * Sets function id for the function event. + * + * @param string $functionId + */ + public function setFunctionId(string $functionId): self + { + $this->functionId = $functionId; + + return $this; + } + + /** + * Returns set function id for the function event. + * + * @return string|null + */ + public function getFunctionId(): ?string + { + return $this->functionId; + } + /** * Sets execution for the function event. * @@ -200,6 +223,7 @@ class Func extends Event 'project' => $this->project, 'user' => $this->user, 'function' => $this->function, + 'functionId' => $this->functionId, 'execution' => $this->execution, 'type' => $this->type, 'jwt' => $this->jwt, diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 31465d2f26..7a0d5b60ac 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -8,6 +8,7 @@ use Appwrite\Platform\Tasks\Maintenance; use Appwrite\Platform\Tasks\Migrate; use Appwrite\Platform\Tasks\QueueCount; use Appwrite\Platform\Tasks\QueueRetry; +use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\Platform\Tasks\ScheduleFunctions; use Appwrite\Platform\Tasks\ScheduleMessages; use Appwrite\Platform\Tasks\SDKs; @@ -33,6 +34,7 @@ class Tasks extends Service ->addAction(SDKs::getName(), new SDKs()) ->addAction(SSL::getName(), new SSL()) ->addAction(ScheduleFunctions::getName(), new ScheduleFunctions()) + ->addAction(ScheduleExecutions::getName(), new ScheduleExecutions()) ->addAction(ScheduleMessages::getName(), new ScheduleMessages()) ->addAction(Specs::getName(), new Specs()) ->addAction(Upgrade::getName(), new Upgrade()) diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index a50fbb2403..be0abc4b66 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -64,7 +64,8 @@ abstract class ScheduleBase extends Action $collectionId = match ($schedule->getAttribute('resourceType')) { 'function' => 'functions', - 'message' => 'messages' + 'message' => 'messages', + 'execution' => 'executions' }; $resource = $getProjectDB($project)->getDocument( @@ -113,7 +114,8 @@ abstract class ScheduleBase extends Action } catch (\Throwable $th) { $collectionId = match ($document->getAttribute('resourceType')) { 'function' => 'functions', - 'message' => 'messages' + 'message' => 'messages', + 'execution' => 'executions' }; Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}"); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php new file mode 100644 index 0000000000..2fdbd98da3 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -0,0 +1,71 @@ +get('queue')->pop(); + $connection = $queue->getResource(); + $queueForFunctions = new Func($connection); + + foreach ($this->schedules as $schedule) { + if (!$schedule['active']) { + $dbForConsole->deleteDocument( + 'schedules', + $schedule['$id'], + ); + + unset($this->schedules[$schedule['resourceId']]); + continue; + } + + $now = new \DateTime(); + $scheduledAt = new \DateTime($schedule['schedule']); + + if ($scheduledAt > $now) { + continue; + } + + $queueForFunctions + ->setType('schedule') + // Set functionId instead of function as we don't have $dbForProject + // TODO: Refactor to use function instead of functionId + ->setFunctionId($schedule['resource']['functionId']) + ->setExecution($schedule['resource']) + ->setMethod($schedule['data']['method'] ?? 'POST') + ->setPath($schedule['data']['path'] ?? '/') + ->setHeaders($schedule['data']['headers'] ?? []) + ->setBody($schedule['data']['body'] ?? '') + ->setProject($schedule['project']) + ->trigger(); + + $dbForConsole->deleteDocument( + 'schedules', + $schedule['$id'], + ); + + unset($this->schedules[$schedule['resourceId']]); + } + + $queue->reclaim(); + } +} diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 8e52973a0c..145b6ee976 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -35,7 +35,7 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($now, $schedule, $pools, $dbForConsole) { + \go(function () use ($schedule, $pools, $dbForConsole) { $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); $queueForMessaging = new Messaging($connection); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index d4d6f75966..fb7ca0b34a 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -83,6 +83,7 @@ class Functions extends Action $eventData = $payload['payload'] ?? ''; $project = new Document($payload['project'] ?? []); $function = new Document($payload['function'] ?? []); + $functionId = $payload['functionId'] ?? ''; $user = new Document($payload['user'] ?? []); $method = $payload['method'] ?? 'POST'; $headers = $payload['headers'] ?? []; @@ -92,6 +93,10 @@ class Functions extends Action return; } + if ($function->isEmpty() && !empty($functionId)) { + $function = $dbForProject->getDocument('functions', $functionId); + } + $log->addTag('functionId', $function->getId()); $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); @@ -176,6 +181,7 @@ class Functions extends Action ); break; case 'schedule': + $execution = new Document($payload['execution'] ?? []); $this->execute( log: $log, dbForProject: $dbForProject, @@ -193,7 +199,7 @@ class Functions extends Action jwt: null, event: null, eventData: null, - executionId: null, + executionId: $execution->getId() ?? null ); break; } diff --git a/tests/e2e/Services/Functions/FunctionsCustomClientTest.php b/tests/e2e/Services/Functions/FunctionsCustomClientTest.php index 222716947f..9a42d54434 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomClientTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomClientTest.php @@ -195,6 +195,137 @@ class FunctionsCustomClientTest extends Scope return []; } + public function testCreateScheduledExecution(): void + { + /** + * Test for SUCCESS + */ + $function = $this->client->call(Client::METHOD_POST, '/functions', [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], [ + 'functionId' => ID::unique(), + 'name' => 'Test', + 'execute' => [Role::user($this->getUser()['$id'])->toString()], + 'runtime' => 'php-8.0', + 'entrypoint' => 'index.php', + 'events' => [ + 'users.*.create', + 'users.*.delete', + ], + 'timeout' => 10, + ]); + + $this->assertEquals(201, $function['headers']['status-code']); + + $folder = 'php'; + $code = realpath(__DIR__ . '/../../../resources/functions') . "/$folder/code.tar.gz"; + $this->packageCode($folder); + + $deployment = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/deployments', [ + 'content-type' => 'multipart/form-data', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], [ + 'entrypoint' => 'index.php', + 'code' => new CURLFile($code, 'application/x-gzip', \basename($code)), + 'activate' => true + ]); + + $deploymentId = $deployment['body']['$id'] ?? ''; + + $this->assertEquals(202, $deployment['headers']['status-code']); + + // Poll until deployment is built + while (true) { + $deployment = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/deployments/' . $deploymentId, [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ]); + + if ( + $deployment['headers']['status-code'] >= 400 + || \in_array($deployment['body']['status'], ['ready', 'failed']) + ) { + break; + } + + \sleep(1); + } + + $this->assertEquals('ready', $deployment['body']['status']); + + $function = $this->client->call(Client::METHOD_PATCH, '/functions/' . $function['body']['$id'] . '/deployments/' . $deploymentId, [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], []); + + $this->assertEquals(200, $function['headers']['status-code']); + + // Schedule execution for the future + \date_default_timezone_set('UTC'); + $futureTime = (new \DateTime())->add(new \DateInterval('PT10S'))->format('Y-m-d H:i:s'); + + $execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'async' => true, + 'scheduledAt' => $futureTime, + 'path' => '/custom', + 'method' => 'GET', + 'body' => 'hello', + 'headers' => [ + 'content-type' => 'application/plain', + ], + ]); + + $this->assertEquals(202, $execution['headers']['status-code']); + $this->assertEquals('scheduled', $execution['body']['status']); + + $executionId = $execution['body']['$id']; + + sleep(20); + + $execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ]); + + $this->assertEquals(200, $execution['headers']['status-code']); + $this->assertEquals(200, $execution['body']['responseStatusCode']); + $this->assertEquals('completed', $execution['body']['status']); + $this->assertEquals('/custom', $execution['body']['requestPath']); + $this->assertEquals('GET', $execution['body']['requestMethod']); + + /* Test for FAILURE */ + + // Schedule synchronous execution + + $execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'async' => false, + 'scheduledAt' => $futureTime, + ]); + + $this->assertEquals(400, $execution['headers']['status-code']); + + // Cleanup : Delete function + $response = $this->client->call(Client::METHOD_DELETE, '/functions/' . $function['body']['$id'], [ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], []); + + $this->assertEquals(204, $response['headers']['status-code']); + } + public function testCreateCustomExecution(): array { /**