diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index ded7301208..33e030dbb7 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -11,98 +11,27 @@ use Utopia\Database\Document; use Utopia\Database\Query; use Swoole\Timer; -const FUNCTION_UPDATE_TIMER = 60; //seconds -const FUNCTION_ENQUEUE_TIMER = 60; //seconds -const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min -const FUNCTION_RESET_TIMER_TO = 50; // seconds - -sleep(4); +const FUNCTION_UPDATE_TIMER = 10; //seconds +const FUNCTION_ENQUEUE_TIMER = 10; //seconds /** - * 1. first load from db with limit+offset - * 2. creating a 5-min offset array ($queue) - * 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta) - * if the function matches the current minute it should be dispatched to the functions worker. - * Then another translation is made to the cron pattern if it is in the next 5-min window - * it is assigned again to the $queue. . - * 4. Second timer runs every X min and updates the $functions (large) list. - * The query fetches only functions that [resourceUpdatedAt] attr changed from the - * last time the timer that was fired (X min) - * If the function was deleted it is unsets from the list ($functions) and the $queue. - * In the end of the timer the $queue is created again. - * + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. */ $cli ->task('schedule') ->desc('Function scheduler task') -->action(function () use ($register) { +->action(function () { Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); $dbForConsole = getConsoleDB(); /** - * @return void - */ - $createQueue = function () use (&$functions, &$queue): void { - $loadStart = \microtime(true); - /** - * Creating smaller functions list containing 5-min timeframe. - */ - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - foreach ($functions as $function) { - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - if ($next < $timeFrame) { - $queue[$next][$function['resourceId']] = $function; - } - } - $loadEnd = \microtime(true); - Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds"); - //var_dump($queue); - }; - - /** - * @param string $id - * @param string $resourceId - * @return void - */ - $removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) { - if (array_key_exists($resourceId, $functions)) { - unset($functions[$resourceId]); - $dbForConsole->deleteDocument('schedules', $id); - Console::error("Removing :{$resourceId} from functions list"); - } - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - unset($queue[$slot][$resourceId]); - Console::error("Removing :{$resourceId} from queue slot $slot"); - } - } - }; - - /** - * @param string $resourceId - * @param array $update - * @return void - */ - $updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void { - - $functions[$resourceId] = $update; - Console::error("Updating :{$resourceId} in functions list"); - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - $queue[$slot][$resourceId] = $update; - Console::error("Updating :{$resourceId} in queue slot $slot"); - } - } - }; - - /** + * Extract only nessessary attributes to lower memory used. + * * @var Document $schedule * @return array */ @@ -115,10 +44,11 @@ $cli ]; } + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + $limit = 10000; $sum = $limit; - $functions = []; - $queue = []; $total = 0; $loadStart = \microtime(true); $latestDocument = null; @@ -137,39 +67,33 @@ $cli $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $functions[$document['resourceId']] = getsSheduleAttributes($document); + $schedules[$document['resourceId']] = getsSheduleAttributes($document); } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } $loadEnd = \microtime(true); - Console::success("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds"); - $createQueue(); - $lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER); - - do { - $second = time() % 60; - } while ($second < FUNCTION_RESET_TIMER_TO); + Console::success("{$total} schedules where loaded in " . ($loadEnd - $loadStart) . " seconds"); $time = DateTime::now(); - Console::success("Starting timers at {$time}"); + Console::success("Starting timers at {$time}"); - - /** - * The timer updates $functions from db on last resourceUpdatedAt attr in X-min. - */ Co\run( - function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { - Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) { $time = DateTime::now(); + $timerStart = \microtime(true); + $limit = 1000; $sum = $limit; $total = 0; $latestDocument = null; - $timerStart = \microtime(true); - //Console::warning("Update proc started at: $time last update was at $lastUpdate"); + Console::log("Sync tick: Running at $time"); while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; @@ -179,82 +103,86 @@ $cli $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), ])); $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); - if ($document['active'] === false) { - //Console::warning("Removing: {$document['resourceId']}"); - $removeFromQueue($document->getId(), $document['resourceId']); - } elseif ($new > $org) { - //Console::warning("Updating: {$document['resourceId']}"); - $updateQueue($document['resourceId'], getsSheduleAttributes($document)); + + if ($$document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = getsSheduleAttributes($document); } } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - $lastUpdate = DateTime::now(); - $createQueue(); + $lastSyncUpdate = $time; $timerEnd = \microtime(true); - //Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); }); /** - * The timer sends to worker every 1 min and re-enqueue matched functions. + * The timer to prepare soon-to-execute schedules. */ - Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { $timerStart = \microtime(true); $time = DateTime::now(); - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - $slot = (new \DateTime())->format('Y-m-d H:i:00.000'); - $prepareStart = time(); - Console::info("Enqueue proc started at: $time"); + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - if (array_key_exists($slot, $queue)) { - $schedule = $queue[$slot]; - console::info(count($schedule) . " functions sent to worker for time slot " . $slot); - $totalPreparation = time() - $prepareStart; + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - $wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation); - Console::info("Waiting for : {$wait} seconds"); - sleep($wait); + $total = 0; - $time = DateTime::now(); - Console::info("Start enqueueing at {$time}"); + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); - foreach ($schedule as $function) { - if (empty($functions[$function['resourceId']])) { - continue; - } + $currentTick = $next < $timeFrame; - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - /** - * If next schedule is in 5-min timeframe - * and it was not removed or changed, re-enqueue the function. - */ - if ( - $next < $timeFrame && - $function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule'] - ) { - $queue[$next][$function['resourceId']] = $function; - } - unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ + if(!$currentTick) { + continue; } - unset($queue[$slot]); /** removing slot */ + + $total++; + + $promiseStart = \microtime(true); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + \go(function() use ($executionSleep, $key, $schedules) { + \usleep($executionSleep * 1000000); // in microseconds + + // Ensure schedule was not deleted + if(!isset($schedules[$key])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + }); } + $timerEnd = \microtime(true); - Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds"); - }); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); } ); }); diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 9ef593dbb9..1980c6dc72 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -418,6 +418,11 @@ class DeletesV1 extends Worker $dbForProject = $this->getProjectDB($project); $functionId = $document->getId(); + /** + * Delete Schedule + */ + // TODO: DeleteDocument schedules collection + /** * Delete Variables */