From 81f1eb35060c08d4a72bbfdd67d87321f0ec4aea Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 11 Jan 2024 16:06:59 +1300 Subject: [PATCH] Abstract scheduling base --- src/Appwrite/Extend/Exception.php | 3 + src/Appwrite/Platform/Tasks/Schedule.php | 244 ------------------ src/Appwrite/Platform/Tasks/ScheduleBase.php | 187 ++++++++++++++ .../Platform/Tasks/ScheduleFunctions.php | 101 ++++++++ .../Platform/Tasks/ScheduleMessage.php | 162 ------------ .../Platform/Tasks/ScheduleMessages.php | 57 ++++ 6 files changed, 348 insertions(+), 406 deletions(-) delete mode 100644 src/Appwrite/Platform/Tasks/Schedule.php create mode 100644 src/Appwrite/Platform/Tasks/ScheduleBase.php create mode 100644 src/Appwrite/Platform/Tasks/ScheduleFunctions.php delete mode 100644 src/Appwrite/Platform/Tasks/ScheduleMessage.php create mode 100644 src/Appwrite/Platform/Tasks/ScheduleMessages.php diff --git a/src/Appwrite/Extend/Exception.php b/src/Appwrite/Extend/Exception.php index ea63423c0..b1d654c40 100644 --- a/src/Appwrite/Extend/Exception.php +++ b/src/Appwrite/Extend/Exception.php @@ -263,6 +263,9 @@ class Exception extends \Exception public const MESSAGE_TARGET_NOT_SMS = 'message_target_not_sms'; public const MESSAGE_TARGET_NOT_PUSH = 'message_target_not_push'; + /** Schedules */ + public const SCHEDULE_NOT_FOUND = 'schedule_not_found'; + protected string $type = ''; protected array $errors = []; diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php deleted file mode 100644 index a136ee62b..000000000 --- a/src/Appwrite/Platform/Tasks/Schedule.php +++ /dev/null @@ -1,244 +0,0 @@ -desc('Execute functions scheduled in Appwrite') - ->inject('pools') - ->inject('dbForConsole') - ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); - } - - /** - * 1. Load all documents from 'schedules' collection to create local copy - * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute - * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. - */ - public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void - { - Console::title('Scheduler V1'); - Console::success(APP_NAME . ' Scheduler v1 has started'); - - /** - * Extract only nessessary attributes to lower memory used. - * - * @var Document $schedule - * @return array - */ - $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { - $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - - $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); - - return [ - 'resourceId' => $schedule->getAttribute('resourceId'), - 'schedule' => $schedule->getAttribute('schedule'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here - 'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here - ]; - }; - - $schedules = []; // Local copy of 'schedules' collection - $lastSyncUpdate = DateTime::now(); - - $limit = 10000; - $sum = $limit; - $total = 0; - $loadStart = \microtime(true); - $latestDocument = null; - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), - Query::equal('resourceType', ['function']), - Query::equal('active', [true]), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - try { - $schedules[$document['resourceId']] = $getSchedule($document); - } catch (\Throwable $th) { - Console::error("Failed to load schedule for project {$document['projectId']} and function {$document['resourceId']}"); - Console::error($th->getMessage()); - } - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $pools->reclaim(); - - Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds"); - - Console::success("Starting timers at " . DateTime::now()); - - run( - function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { - /** - * The timer synchronize $schedules copy with database collection. - */ - Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { - $time = DateTime::now(); - $timerStart = \microtime(true); - - $limit = 1000; - $sum = $limit; - $total = 0; - $latestDocument = null; - - Console::log("Sync tick: Running at $time"); - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), - Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $localDocument = $schedules[$document['resourceId']] ?? null; - - $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; - $new = strtotime($document['resourceUpdatedAt']); - - if ($document['active'] === false) { - Console::info("Removing: {$document['resourceId']}"); - unset($schedules[$document['resourceId']]); - } elseif ($new !== $org) { - Console::info("Updating: {$document['resourceId']}"); - $schedules[$document['resourceId']] = $getSchedule($document); - } - } - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $lastSyncUpdate = $time; - $timerEnd = \microtime(true); - - $pools->reclaim(); - - Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); - }); - - /** - * The timer to prepare soon-to-execute schedules. - */ - $lastEnqueueUpdate = null; - $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) { - $timerStart = \microtime(true); - $time = DateTime::now(); - - $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; - $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - - Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - - $total = 0; - - $delayedExecutions = []; // Group executions with same delay to share one coroutine - - foreach ($schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); - $nextDate = $cron->getNextRunDate(); - $next = DateTime::format($nextDate); - - $currentTick = $next < $timeFrame; - - if (!$currentTick) { - continue; - } - - $total++; - - $promiseStart = \time(); // in seconds - $executionStart = $nextDate->getTimestamp(); // in seconds - $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - - if (!isset($delayedExecutions[$delay])) { - $delayedExecutions[$delay] = []; - } - - $delayedExecutions[$delay][] = $key; - } - - foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $schedules, $scheduleKeys, $pools) { - \sleep($delay); // in seconds - - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - - foreach ($scheduleKeys as $scheduleKey) { - // Ensure schedule was not deleted - if (!isset($schedules[$scheduleKey])) { - return; - } - - $schedule = $schedules[$scheduleKey]; - - $functions = new Func($connection); - - $functions - ->setType('schedule') - ->setFunction($schedule['function']) - ->setMethod('POST') - ->setPath('/') - ->setProject($schedule['project']) - ->trigger(); - } - - $queue->reclaim(); - }); - } - - $timerEnd = \microtime(true); - $lastEnqueueUpdate = $timerStart; - Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds"); - }; - - Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); - $enqueueFunctions(); - } - ); - } -} diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php new file mode 100644 index 000000000..bb9a64ed6 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -0,0 +1,187 @@ +desc("Execute {$type}s scheduled in Appwrite") + ->inject('pools') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn(Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + } + + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. + */ + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + { + Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); + Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + + /** + * Extract only necessary attributes to lower memory used. + * + * @return array + * @throws Exception + * @var Document $schedule + */ + $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + + $resource = $getProjectDB($project)->getDocument( + $schedule->getAttribute('resourceCollection'), + $schedule->getAttribute('resourceId') + ); + + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + 'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + ]; + }; + + $lastSyncUpdate = DateTime::now(); + + $limit = 10_000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + + if ($latestDocument) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), + Query::equal('resourceType', [static::getSupportedResource()]), + Query::equal('active', [true]), + ])); + + $sum = \count($results); + $total = $total + $sum; + + foreach ($results as $document) { + try { + $this->schedules[$document['resourceId']] = $getSchedule($document); + } catch (\Throwable $th) { + Console::error("Failed to load schedule for project {$document['projectId']} {$document['resourceCollection']} {$document['resourceId']}"); + Console::error($th->getMessage()); + } + } + + $latestDocument = \end($results); + } + + $pools->reclaim(); + + Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + + if ($latestDocument) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION', 'default')]), + Query::equal('resourceType', [static::getSupportedResource()]), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), + ])); + + $sum = count($results); + $total = $total + $sum; + + foreach ($results as $document) { + $localDocument = $schedules[$document['resourceId']] ?? null; + + // Check if resource has been updated since last sync + $org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null; + $new = \strtotime($document['resourceUpdatedAt']); + + if (!$document['active']) { + Console::info("Removing: {$document['resourceId']}"); + unset($this->schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $this->schedules[$document['resourceId']] = $getSchedule($document); + } + } + + $latestDocument = \end($results); + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + $pools->reclaim(); + + Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); + }); + + Timer::tick(static::ENQUEUE_TIMER * 1000, fn() => + $this->enqueueResources($pools, $dbForConsole)); + + $this->enqueueResources($pools, $dbForConsole); + }); + } +} diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php new file mode 100644 index 000000000..3128eb053 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -0,0 +1,101 @@ +lastEnqueueUpdate === null ? 0 : $timerStart - $this->lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), static::ENQUEUE_TIMER - $enqueueDiff); + + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); + + $total = 0; + + $delayedExecutions = []; // Group executions with same delay to share one coroutine + + foreach ($this->schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); + + $currentTick = $next < $timeFrame; + + if (!$currentTick) { + continue; + } + + $total++; + + $promiseStart = \time(); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + if (!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } + + $delayedExecutions[$delay][] = $key; + } + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $scheduleKeys, $pools) { + \sleep($delay); // in seconds + + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + + foreach ($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if (!isset($schedules[$scheduleKey])) { + return; + } + + $schedule = $schedules[$scheduleKey]; + + $queueForFunctions = new Func($connection); + + $queueForFunctions + ->setType('schedule') + ->setFunction($schedule['resource']) + ->setMethod('POST') + ->setPath('/') + ->setProject($schedule['project']) + ->trigger(); + } + + $queue->reclaim(); + }); + } + + $timerEnd = \microtime(true); + $this->lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds"); + } +} diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessage.php b/src/Appwrite/Platform/Tasks/ScheduleMessage.php deleted file mode 100644 index 849df8587..000000000 --- a/src/Appwrite/Platform/Tasks/ScheduleMessage.php +++ /dev/null @@ -1,162 +0,0 @@ -desc('Execute functions scheduled in Appwrite') - ->inject('pools') - ->inject('dbForConsole') - ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); - } - - /** - * 1. Load all documents from 'schedules' collection to create local copy - * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute - * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. - */ - public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void - { - Console::title('Scheduler V1'); - Console::success(APP_NAME . ' Scheduler v1 has started'); - - $schedules = []; // Local copy of 'schedules' collection - $lastSyncUpdate = DateTime::now(); - - $limit = 10000; - $sum = $limit; - $total = 0; - $loadStart = \microtime(true); - $latestDocument = null; - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - try { - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())), - Query::equal('resourceType', ['message']), - Query::equal('active', [true]), - ])); - } catch (\Exception $e) { - var_dump($e->getTraceAsString()); - } - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $schedule) { - $schedules[$schedule->getId()] = $schedule; - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $pools->reclaim(); - - Console::success("{$total} message were loaded in " . (microtime(true) - $loadStart) . " seconds"); - - Console::success("Starting timers at " . DateTime::now()); - - run( - function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) { - /** - * The timer synchronize $schedules copy with database collection. - */ - Timer::tick(self::MESSAGE_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) { - $time = DateTime::now(); - $timerStart = \microtime(true); - - $limit = 1000; - $sum = $limit; - $total = 0; - $latestDocument = null; - - Console::log("Sync tick: Running at $time"); - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())), - Query::equal('resourceType', ['message']), - Query::equal('active', [true]), - ])); - $sum = \count($results); - $total = $total + $sum; - foreach ($results as $schedule) { - $schedules[$schedule->getId()] = $schedule; - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $lastSyncUpdate = $time; - $timerEnd = \microtime(true); - - $pools->reclaim(); - - Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); - }); - - /** - * The timer to prepare soon-to-execute schedules. - */ - $enqueueMessages = function () use (&$schedules, $pools, $dbForConsole) { - foreach ($schedules as $scheduleId => $schedule) { - \go(function () use (&$schedules, $schedule, $pools, $dbForConsole) { - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); - $queueForDeletes = new Delete($connection); - $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - $queueForMessaging - ->setMessageId($schedule->getAttribute('resourceId')) - ->setProject($project) - ->trigger(); - $schedule->setAttribute('active', false); - $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); - - $queueForDeletes - ->setType(DELETE_TYPE_SCHEDULES) - ->setDocument($schedule); - - $queue->reclaim(); - unset($schedules[$schedule->getId()]); - }); - } - }; - - Timer::tick(self::MESSAGE_ENQUEUE_TIMER * 1000, fn () => $enqueueMessages()); - $enqueueMessages(); - } - ); - } -} diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php new file mode 100644 index 000000000..6d938a8b4 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -0,0 +1,57 @@ +schedules as $schedule) { + \go(function () use ($schedule, $pools, $dbForConsole) { + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + $queueForMessaging = new Messaging($connection); + $queueForDeletes = new Delete($connection); + + $queueForMessaging + ->setMessageId($schedule['resourceId']) + ->setProject($schedule['project']) + ->trigger(); + + $queueForDeletes + ->setType(DELETE_TYPE_SCHEDULES) + ->setDocument($schedule) + ->trigger(); + + $queue->reclaim(); + + unset($this->schedules[$schedule->getId()]); + }); + } + } +}