1
0
Fork 0
mirror of synced 2024-07-12 01:45:54 +12:00

Improve schedule accuracy + simplify

This commit is contained in:
Matej Bačo 2022-11-14 09:29:30 +00:00
parent 250ea93d3f
commit 5e7de81966
2 changed files with 78 additions and 145 deletions

View file

@ -11,98 +11,27 @@ use Utopia\Database\Document;
use Utopia\Database\Query; use Utopia\Database\Query;
use Swoole\Timer; use Swoole\Timer;
const FUNCTION_UPDATE_TIMER = 60; //seconds const FUNCTION_UPDATE_TIMER = 10; //seconds
const FUNCTION_ENQUEUE_TIMER = 60; //seconds const FUNCTION_ENQUEUE_TIMER = 10; //seconds
const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min
const FUNCTION_RESET_TIMER_TO = 50; // seconds
sleep(4);
/** /**
* 1. first load from db with limit+offset * 1. Load all documents from 'schedules' collection to create local copy
* 2. creating a 5-min offset array ($queue) * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta) * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
* if the function matches the current minute it should be dispatched to the functions worker.
* Then another translation is made to the cron pattern if it is in the next 5-min window
* it is assigned again to the $queue. .
* 4. Second timer runs every X min and updates the $functions (large) list.
* The query fetches only functions that [resourceUpdatedAt] attr changed from the
* last time the timer that was fired (X min)
* If the function was deleted it is unsets from the list ($functions) and the $queue.
* In the end of the timer the $queue is created again.
*
*/ */
$cli $cli
->task('schedule') ->task('schedule')
->desc('Function scheduler task') ->desc('Function scheduler task')
->action(function () use ($register) { ->action(function () {
Console::title('Scheduler V1'); Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started'); Console::success(APP_NAME . ' Scheduler v1 has started');
$dbForConsole = getConsoleDB(); $dbForConsole = getConsoleDB();
/** /**
* @return void * Extract only nessessary attributes to lower memory used.
*/ *
$createQueue = function () use (&$functions, &$queue): void {
$loadStart = \microtime(true);
/**
* Creating smaller functions list containing 5-min timeframe.
*/
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
foreach ($functions as $function) {
$cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate());
if ($next < $timeFrame) {
$queue[$next][$function['resourceId']] = $function;
}
}
$loadEnd = \microtime(true);
Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
//var_dump($queue);
};
/**
* @param string $id
* @param string $resourceId
* @return void
*/
$removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) {
if (array_key_exists($resourceId, $functions)) {
unset($functions[$resourceId]);
$dbForConsole->deleteDocument('schedules', $id);
Console::error("Removing :{$resourceId} from functions list");
}
foreach ($queue as $slot => $schedule) {
if (array_key_exists($resourceId, $schedule)) {
unset($queue[$slot][$resourceId]);
Console::error("Removing :{$resourceId} from queue slot $slot");
}
}
};
/**
* @param string $resourceId
* @param array $update
* @return void
*/
$updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void {
$functions[$resourceId] = $update;
Console::error("Updating :{$resourceId} in functions list");
foreach ($queue as $slot => $schedule) {
if (array_key_exists($resourceId, $schedule)) {
$queue[$slot][$resourceId] = $update;
Console::error("Updating :{$resourceId} in queue slot $slot");
}
}
};
/**
* @var Document $schedule * @var Document $schedule
* @return array * @return array
*/ */
@ -115,10 +44,11 @@ $cli
]; ];
} }
$schedules = []; // Local copy of 'schedules' collection
$lastSyncUpdate = DateTime::now();
$limit = 10000; $limit = 10000;
$sum = $limit; $sum = $limit;
$functions = [];
$queue = [];
$total = 0; $total = 0;
$loadStart = \microtime(true); $loadStart = \microtime(true);
$latestDocument = null; $latestDocument = null;
@ -137,39 +67,33 @@ $cli
$sum = count($results); $sum = count($results);
$total = $total + $sum; $total = $total + $sum;
foreach ($results as $document) { foreach ($results as $document) {
$functions[$document['resourceId']] = getsSheduleAttributes($document); $schedules[$document['resourceId']] = getsSheduleAttributes($document);
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
} }
$loadEnd = \microtime(true); $loadEnd = \microtime(true);
Console::success("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds"); Console::success("{$total} schedules where loaded in " . ($loadEnd - $loadStart) . " seconds");
$createQueue();
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER);
do {
$second = time() % 60;
} while ($second < FUNCTION_RESET_TIMER_TO);
$time = DateTime::now(); $time = DateTime::now();
Console::success("Starting timers at {$time}"); Console::success("Starting timers at {$time}");
/**
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
*/
Co\run( Co\run(
function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) {
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { /**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) {
$time = DateTime::now(); $time = DateTime::now();
$timerStart = \microtime(true);
$limit = 1000; $limit = 1000;
$sum = $limit; $sum = $limit;
$total = 0; $total = 0;
$latestDocument = null; $latestDocument = null;
$timerStart = \microtime(true);
//Console::warning("Update proc started at: $time last update was at $lastUpdate"); Console::log("Sync tick: Running at $time");
while ($sum === $limit) { while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)]; $paginationQueries = [Query::limit($limit)];
@ -179,82 +103,86 @@ $cli
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('resourceType', ['function']), Query::equal('resourceType', ['function']),
Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate), Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
])); ]));
$sum = count($results); $sum = count($results);
$total = $total + $sum; $total = $total + $sum;
foreach ($results as $document) { foreach ($results as $document) {
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; $localDocument = $schedules[$document['resourceId']] ?? null;
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']); $new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) {
//Console::warning("Removing: {$document['resourceId']}"); if ($$document['active'] === false) {
$removeFromQueue($document->getId(), $document['resourceId']); Console::info("Removing: {$document['resourceId']}");
} elseif ($new > $org) { unset($schedules[$document['resourceId']]);
//Console::warning("Updating: {$document['resourceId']}"); } elseif ($new !== $org) {
$updateQueue($document['resourceId'], getsSheduleAttributes($document)); Console::info("Updating: {$document['resourceId']}");
$schedules[$document['resourceId']] = getsSheduleAttributes($document);
} }
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
} }
$lastUpdate = DateTime::now(); $lastSyncUpdate = $time;
$createQueue();
$timerEnd = \microtime(true); $timerEnd = \microtime(true);
//Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds");
}); });
/** /**
* The timer sends to worker every 1 min and re-enqueue matched functions. * The timer to prepare soon-to-execute schedules.
*/ */
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { $lastEnqueueUpdate = null;
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) {
$timerStart = \microtime(true); $timerStart = \microtime(true);
$time = DateTime::now(); $time = DateTime::now();
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
$slot = (new \DateTime())->format('Y-m-d H:i:00.000');
$prepareStart = time();
Console::info("Enqueue proc started at: $time"); $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
if (array_key_exists($slot, $queue)) { Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$schedule = $queue[$slot];
console::info(count($schedule) . " functions sent to worker for time slot " . $slot);
$totalPreparation = time() - $prepareStart;
$wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation); $total = 0;
Console::info("Waiting for : {$wait} seconds");
sleep($wait);
$time = DateTime::now(); foreach ($schedules as $key => $schedule) {
Console::info("Start enqueueing at {$time}"); $cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
foreach ($schedule as $function) { $currentTick = $next < $timeFrame;
if (empty($functions[$function['resourceId']])) {
if(!$currentTick) {
continue; continue;
} }
$cron = new CronExpression($function['schedule']); $total++;
$next = DateTime::format($cron->getNextRunDate());
/** $promiseStart = \microtime(true); // in seconds
* If next schedule is in 5-min timeframe $executionStart = $nextDate->getTimestamp(); // in seconds
* and it was not removed or changed, re-enqueue the function. $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
*/
if ( \go(function() use ($executionSleep, $key, $schedules) {
$next < $timeFrame && \usleep($executionSleep * 1000000); // in microseconds
$function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule']
) { // Ensure schedule was not deleted
$queue[$next][$function['resourceId']] = $function; if(!isset($schedules[$key])) {
return;
} }
unset($queue[$slot][$function['resourceId']]); /** removing function from slot */
} Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue
unset($queue[$slot]); /** removing slot */
}
$timerEnd = \microtime(true);
Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds");
}); });
} }
$timerEnd = \microtime(true);
$lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds");
};
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
$enqueueFunctions();
}
); );
}); });

View file

@ -418,6 +418,11 @@ class DeletesV1 extends Worker
$dbForProject = $this->getProjectDB($project); $dbForProject = $this->getProjectDB($project);
$functionId = $document->getId(); $functionId = $document->getId();
/**
* Delete Schedule
*/
// TODO: DeleteDocument schedules collection
/** /**
* Delete Variables * Delete Variables
*/ */