diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index 798353ef57..f9106e47c8 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -16,8 +16,8 @@ use function Swoole\Coroutine\run; class Schedule extends Action { - const FUNCTION_UPDATE_TIMER = 10; //seconds - const FUNCTION_ENQUEUE_TIMER = 60; //seconds + public const FUNCTION_UPDATE_TIMER = 10; //seconds + public const FUNCTION_ENQUEUE_TIMER = 60; //seconds public static function getName(): string { @@ -42,10 +42,10 @@ class Schedule extends Action { Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); - + /** * Extract only nessessary attributes to lower memory used. - * + * * @var Document $schedule * @return array */ @@ -55,7 +55,7 @@ class Schedule extends Action [ $database, $reclaim ] = $getProjectDB($project); $function = $database->getDocument('functions', $schedule->getAttribute('resourceId')); $reclaim(); - + return [ 'resourceId' => $schedule->getAttribute('resourceId'), 'schedule' => $schedule->getAttribute('schedule'), @@ -64,16 +64,16 @@ class Schedule extends Action 'function' => $function, ]; }; - + $schedules = []; // Local copy of 'schedules' collection $lastSyncUpdate = DateTime::now(); - + $limit = 10000; $sum = $limit; $total = 0; $loadStart = \microtime(true); $latestDocument = null; - + while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; if ($latestDocument !== null) { @@ -84,20 +84,20 @@ class Schedule extends Action Query::equal('resourceType', ['function']), Query::equal('active', [true]), ])); - + $sum = count($results); $total = $total + $sum; foreach ($results as $document) { $schedules[$document['resourceId']] = $getSchedule($document); } - + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); - + Console::success("Starting timers at " . DateTime::now()); - + run( function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { /** @@ -106,14 +106,14 @@ class Schedule extends Action Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { $time = DateTime::now(); $timerStart = \microtime(true); - + $limit = 1000; $sum = $limit; $total = 0; $latestDocument = null; - + Console::log("Sync tick: Running at $time"); - + while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; if ($latestDocument !== null) { @@ -124,15 +124,15 @@ class Schedule extends Action Query::equal('resourceType', ['function']), Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), ])); - + $sum = count($results); $total = $total + $sum; foreach ($results as $document) { $localDocument = $schedules[$document['resourceId']] ?? null; - + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); - + if ($document['active'] === false) { Console::info("Removing: {$document['resourceId']}"); unset($schedules[$document['resourceId']]); @@ -143,13 +143,13 @@ class Schedule extends Action } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - + $lastSyncUpdate = $time; $timerEnd = \microtime(true); - + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); }); - + /** * The timer to prepare soon-to-execute schedules. */ @@ -157,62 +157,62 @@ class Schedule extends Action $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { $timerStart = \microtime(true); $time = DateTime::now(); - + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - + $total = 0; - + $delayedExecutions = []; // Group executions with same delay to share one coroutine - + foreach ($schedules as $key => $schedule) { $cron = new CronExpression($schedule['schedule']); $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate); - + $currentTick = $next < $timeFrame; - - if(!$currentTick) { + + if (!$currentTick) { continue; } - + $total++; - + $promiseStart = \microtime(true); // in seconds $executionStart = $nextDate->getTimestamp(); // in seconds $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - + $delay = \ceil(\intval($executionSleep)); - - if(!isset($delayedExecutions[$delay])) { + + if (!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; } - + $delayedExecutions[$delay][] = $key; } - - foreach($delayedExecutions as $delay => $scheduleKeys) { - \go(function() use ($delay, $schedules, $scheduleKeys) { + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys) { \sleep($delay); // in seconds - - foreach($scheduleKeys as $scheduleKey) { + + foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted - if(!isset($schedules[$scheduleKey])) { + if (!isset($schedules[$scheduleKey])) { return; } - + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue } }); } - + $timerEnd = \microtime(true); $lastEnqueueUpdate = $timerStart; Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); }; - + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); $enqueueFunctions(); }