diff --git a/Dockerfile b/Dockerfile index 1d2ac91ae0..d7e9849b1b 100755 --- a/Dockerfile +++ b/Dockerfile @@ -79,6 +79,7 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/migrate && \ chmod +x /usr/local/bin/realtime && \ chmod +x /usr/local/bin/schedule-functions && \ + chmod +x /usr/local/bin/schedule-executions && \ chmod +x /usr/local/bin/schedule-messages && \ chmod +x /usr/local/bin/sdks && \ chmod +x /usr/local/bin/specs && \ diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index f8690ebce6..5aa3083f3c 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -1722,7 +1722,7 @@ App::post('/v1/functions/:functionId/executions') 'functionId' => $function->getId(), 'deploymentInternalId' => $deployment->getInternalId(), 'deploymentId' => $deployment->getId(), - 'trigger' => 'http', // http / schedule / event + 'trigger' => (!is_null($scheduledAt)) ? 'schedule' : 'http', 'status' => $status, // waiting / processing / completed / failed 'responseStatusCode' => 0, 'responseHeaders' => [], @@ -1764,9 +1764,9 @@ App::post('/v1/functions/:functionId/executions') } else { $dbForConsole->createDocument('schedules', new Document([ 'region' => System::getEnv('_APP_REGION', 'default'), - 'resourceType' => 'function', - 'resourceId' => $function->getId(), - 'resourceInternalId' => $function->getInternalId(), + 'resourceType' => 'execution', + 'resourceId' => $execution->getId(), + 'resourceInternalId' => $execution->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), 'schedule' => $scheduledAt, diff --git a/app/views/install/compose.phtml b/app/views/install/compose.phtml index f265f53c19..9643440f5e 100644 --- a/app/views/install/compose.phtml +++ b/app/views/install/compose.phtml @@ -676,6 +676,31 @@ services: - _APP_DB_USER - _APP_DB_PASS + appwrite-task-scheduler-executions: + image: /: + entrypoint: schedule-executions + container_name: appwrite-task-scheduler-executions + <<: *x-logging + restart: unless-stopped + networks: + - appwrite + depends_on: + - mariadb + - redis + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + appwrite-task-scheduler-messages: image: /: entrypoint: schedule-messages diff --git a/bin/schedule-executions b/bin/schedule-executions new file mode 100644 index 0000000000..f239cad206 --- /dev/null +++ b/bin/schedule-executions @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/cli.php schedule-executions $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 250eb8b7aa..6dd2109c6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,6 @@ x-logging: &x-logging max-file: "5" max-size: "10m" -version: "3" - services: traefik: image: traefik:2.11 @@ -742,6 +740,33 @@ services: - _APP_DB_USER - _APP_DB_PASS + appwrite-task-scheduler-executions: + entrypoint: schedule-executions + <<: *x-logging + container_name: appwrite-task-scheduler-executions + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - mariadb + - redis + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + appwrite-task-scheduler-messages: entrypoint: schedule-messages <<: *x-logging diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 11c9e980ed..67c28575bd 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -14,6 +14,7 @@ class Func extends Event protected string $path = ''; protected string $method = ''; protected array $headers = []; + protected ?string $functionId = null; protected ?Document $function = null; protected ?Document $execution = null; @@ -49,6 +50,28 @@ class Func extends Event return $this->function; } + /** + * Sets function id for the function event. + * + * @param string $functionId + */ + public function setFunctionId(string $functionId): self + { + $this->functionId = $functionId; + + return $this; + } + + /** + * Returns set function id for the function event. + * + * @return string|null + */ + public function getFunctionId(): ?string + { + return $this->functionId; + } + /** * Sets execution for the function event. * @@ -200,6 +223,7 @@ class Func extends Event 'project' => $this->project, 'user' => $this->user, 'function' => $this->function, + 'functionId' => $this->functionId, 'execution' => $this->execution, 'type' => $this->type, 'jwt' => $this->jwt, diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index ac1f99eec3..b7b333b2c6 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -9,6 +9,7 @@ use Appwrite\Platform\Tasks\Migrate; use Appwrite\Platform\Tasks\QueueCount; use Appwrite\Platform\Tasks\QueueRetry; use Appwrite\Platform\Tasks\ScheduleFunctions; +use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\Platform\Tasks\ScheduleMessages; use Appwrite\Platform\Tasks\SDKs; use Appwrite\Platform\Tasks\Specs; @@ -33,6 +34,7 @@ class Tasks extends Service ->addAction(SDKs::getName(), new SDKs()) ->addAction(SSL::getName(), new SSL()) ->addAction(ScheduleFunctions::getName(), new ScheduleFunctions()) + ->addAction(ScheduleExecutions::getName(), new ScheduleExecutions()) ->addAction(ScheduleMessages::getName(), new ScheduleMessages()) ->addAction(Specs::getName(), new Specs()) ->addAction(Upgrade::getName(), new Upgrade()) diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index a50fbb2403..be0abc4b66 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -64,7 +64,8 @@ abstract class ScheduleBase extends Action $collectionId = match ($schedule->getAttribute('resourceType')) { 'function' => 'functions', - 'message' => 'messages' + 'message' => 'messages', + 'execution' => 'executions' }; $resource = $getProjectDB($project)->getDocument( @@ -113,7 +114,8 @@ abstract class ScheduleBase extends Action } catch (\Throwable $th) { $collectionId = match ($document->getAttribute('resourceType')) { 'function' => 'functions', - 'message' => 'messages' + 'message' => 'messages', + 'execution' => 'executions' }; Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}"); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php new file mode 100644 index 0000000000..01dde1e88b --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -0,0 +1,67 @@ +schedules as $schedule) { + if (!$schedule['active'] || CronExpression::isValidExpression($schedule['schedule'])) { + unset($this->schedules[$schedule['resourceId']]); + continue; + } + + $now = new \DateTime(); + $scheduledAt = new \DateTime($schedule['schedule']); + + if ($scheduledAt > $now) { + continue; + } + + \go(function () use ($schedule, $pools, $dbForConsole) { + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + + $queueForFunctions = new Func($connection); + + $queueForFunctions + ->setType('schedule') + ->setFunctionId($schedule['resource']['functionId']) + ->setExecution($schedule['resource']) + ->setMethod('POST') + ->setPath('/') + ->setProject($schedule['project']) + ->trigger(); + + $dbForConsole->deleteDocument( + 'schedules', + $schedule['$id'], + ); + + $queue->reclaim(); + + unset($this->schedules[$schedule['resourceId']]); + }); + } + } +} diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index fd417ee274..6ea972af1e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -11,8 +11,8 @@ use Utopia\Pools\Group; class ScheduleFunctions extends ScheduleBase { - public const UPDATE_TIMER = 3; // seconds - public const ENQUEUE_TIMER = 4; // seconds + public const UPDATE_TIMER = 10; // seconds + public const ENQUEUE_TIMER = 60; // seconds private ?float $lastEnqueueUpdate = null; @@ -40,39 +40,37 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions = []; // Group executions with same delay to share one coroutine - foreach ($this->schedules as $scheduleKey => $schedule) { - if (CronExpression::isValidExpression($schedule['schedule'])) { - $cron = new CronExpression($schedule['schedule']); - $nextDate = $cron->getNextRunDate(); - } else { - try { - $nextDate = new \DateTime($schedule['schedule']); - $schedule['delete'] = true; - } catch (\Exception) { - Console::error('Failed to parse schedule: ' . $schedule['schedule']); - continue; - } + foreach ($this->schedules as $key => $schedule) { + if (!$schedule['active'] || !CronExpression::isValidExpression($schedule['schedule'])) { + unset($this->schedules[$schedule['resourceId']]); + continue; } + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate); + $currentTick = $next < $timeFrame; if (!$currentTick) { continue; } - $total += 1; - $delay = $nextDate->getTimestamp() - \time(); // Time to wait from now until execution needs to be queued + $total++; + + $promiseStart = \time(); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued if (!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; } - $delayedExecutions[$delay][] = $scheduleKey; + $delayedExecutions[$delay][] = $key; } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools, $dbForConsole) { + \go(function () use ($delay, $scheduleKeys, $pools) { \sleep($delay); // in seconds $queue = $pools->get('queue')->pop(); @@ -83,6 +81,7 @@ class ScheduleFunctions extends ScheduleBase if (!\array_key_exists($scheduleKey, $this->schedules)) { return; } + $schedule = $this->schedules[$scheduleKey]; $queueForFunctions = new Func($connection); @@ -94,13 +93,6 @@ class ScheduleFunctions extends ScheduleBase ->setPath('/') ->setProject($schedule['project']) ->trigger(); - - if ($schedule['delete']) { - $dbForConsole->deleteDocument( - 'schedules', - $schedule['$id'], - ); - } } $queue->reclaim(); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 8e52973a0c..145b6ee976 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -35,7 +35,7 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($now, $schedule, $pools, $dbForConsole) { + \go(function () use ($schedule, $pools, $dbForConsole) { $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); $queueForMessaging = new Messaging($connection); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index cbba9657ad..734fbab602 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -83,6 +83,7 @@ class Functions extends Action $eventData = $payload['payload'] ?? ''; $project = new Document($payload['project'] ?? []); $function = new Document($payload['function'] ?? []); + $functionId = $payload['functionId'] ?? ''; $user = new Document($payload['user'] ?? []); $method = $payload['method'] ?? 'POST'; $headers = $payload['headers'] ?? []; @@ -92,6 +93,10 @@ class Functions extends Action return; } + if ($function->isEmpty() && !empty($functionId)) { + $function = $dbForProject->getDocument('functions', $functionId); + } + $log->addTag('functionId', $function->getId()); $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); @@ -176,6 +181,7 @@ class Functions extends Action ); break; case 'schedule': + $execution = new Document($payload['execution'] ?? []); $this->execute( log: $log, dbForProject: $dbForProject, @@ -193,7 +199,7 @@ class Functions extends Action jwt: null, event: null, eventData: null, - executionId: null, + executionId: $execution->getId() ?? null ); break; } @@ -296,7 +302,6 @@ class Functions extends Action $headers['x-appwrite-user-id'] = $user->getId() ?? ''; $headers['x-appwrite-user-jwt'] = $jwt ?? ''; - /** Create execution or update execution status */ /** Create execution or update execution status */ $execution = $dbForProject->getDocument('executions', $executionId ?? ''); if ($execution->isEmpty()) { diff --git a/tests/e2e/Services/Functions/FunctionsCustomClientTest.php b/tests/e2e/Services/Functions/FunctionsCustomClientTest.php index 0e70794879..246bed8c51 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomClientTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomClientTest.php @@ -57,6 +57,7 @@ class FunctionsCustomClientTest extends Scope 'execute' => [Role::user($this->getUser()['$id'])->toString()], 'runtime' => 'php-8.0', 'entrypoint' => 'index.php', + 'logging' => true, 'events' => [ 'users.*.create', 'users.*.delete', @@ -246,6 +247,7 @@ class FunctionsCustomClientTest extends Scope $this->assertEquals(200, $function['headers']['status-code']); // Schedule execution for the future + \date_default_timezone_set('UTC'); $futureTime = (new \DateTime())->add(new \DateInterval('PT10S'))->format('Y-m-d H:i:s'); $execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([ 'content-type' => 'application/json', @@ -260,7 +262,7 @@ class FunctionsCustomClientTest extends Scope $executionId = $execution['body']['$id']; - sleep(12); + sleep(20); $execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [ 'content-type' => 'application/json', @@ -281,8 +283,6 @@ class FunctionsCustomClientTest extends Scope $this->assertEquals(204, $response['headers']['status-code']); } - - public function testCreateCustomExecution(): array { /**