1
0
Fork 0
mirror of synced 2024-09-29 17:01:37 +13:00

some fixes

This commit is contained in:
shimon 2022-11-13 15:14:00 +02:00
parent 9e3b9b8e59
commit 250ea93d3f

View file

@ -7,12 +7,14 @@ use Cron\CronExpression;
use Utopia\App; use Utopia\App;
use Utopia\CLI\Console; use Utopia\CLI\Console;
use Utopia\Database\DateTime; use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query; use Utopia\Database\Query;
use Swoole\Timer; use Swoole\Timer;
const FUNCTION_UPDATE_TIMER = 60; //seconds const FUNCTION_UPDATE_TIMER = 60; //seconds
const FUNCTION_ENQUEUE_TIMER = 60; //seconds const FUNCTION_ENQUEUE_TIMER = 60; //seconds
const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min
const FUNCTION_RESET_TIMER_TO = 50; // seconds
sleep(4); sleep(4);
@ -38,7 +40,12 @@ $cli
Console::title('Scheduler V1'); Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started'); Console::success(APP_NAME . ' Scheduler v1 has started');
$createQueue = function () use (&$functions, &$queue) { $dbForConsole = getConsoleDB();
/**
* @return void
*/
$createQueue = function () use (&$functions, &$queue): void {
$loadStart = \microtime(true); $loadStart = \microtime(true);
/** /**
* Creating smaller functions list containing 5-min timeframe. * Creating smaller functions list containing 5-min timeframe.
@ -47,24 +54,67 @@ $cli
foreach ($functions as $function) { foreach ($functions as $function) {
$cron = new CronExpression($function['schedule']); $cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate()); $next = DateTime::format($cron->getNextRunDate());
if ($next < $timeFrame) { if ($next < $timeFrame) {
$queue[$next][$function['resourceId']] = $function; $queue[$next][$function['resourceId']] = $function;
} }
} }
$loadEnd = \microtime(true); $loadEnd = \microtime(true);
Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds"); Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
//var_dump($queue);
}; };
$removeFromQueue = function ($resourceId) use (&$queue) { /**
* @param string $id
* @param string $resourceId
* @return void
*/
$removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) {
if (array_key_exists($resourceId, $functions)) {
unset($functions[$resourceId]);
$dbForConsole->deleteDocument('schedules', $id);
Console::error("Removing :{$resourceId} from functions list");
}
foreach ($queue as $slot => $schedule) { foreach ($queue as $slot => $schedule) {
if (array_key_exists($resourceId, $schedule)) { if (array_key_exists($resourceId, $schedule)) {
Console::error("Unsetting :{$resourceId} from queue slot $slot");
unset($queue[$slot][$resourceId]); unset($queue[$slot][$resourceId]);
Console::error("Removing :{$resourceId} from queue slot $slot");
} }
} }
}; };
$dbForConsole = getConsoleDB(); /**
* @param string $resourceId
* @param array $update
* @return void
*/
$updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void {
$functions[$resourceId] = $update;
Console::error("Updating :{$resourceId} in functions list");
foreach ($queue as $slot => $schedule) {
if (array_key_exists($resourceId, $schedule)) {
$queue[$slot][$resourceId] = $update;
Console::error("Updating :{$resourceId} in queue slot $slot");
}
}
};
/**
* @var Document $schedule
* @return array
*/
function getsSheduleAttributes(Document $schedule): array
{
return [
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
];
}
$limit = 10000; $limit = 10000;
$sum = $limit; $sum = $limit;
$functions = []; $functions = [];
@ -87,11 +137,7 @@ $cli
$sum = count($results); $sum = count($results);
$total = $total + $sum; $total = $total + $sum;
foreach ($results as $document) { foreach ($results as $document) {
$functions[$document['resourceId']] = [ $functions[$document['resourceId']] = getsSheduleAttributes($document);
'resourceId' => $document->getAttribute('resourceId'),
'schedule' => $document->getAttribute('schedule'),
'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'),
];
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
@ -102,12 +148,20 @@ $cli
$createQueue(); $createQueue();
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER); $lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER);
do {
$second = time() % 60;
} while ($second < FUNCTION_RESET_TIMER_TO);
$time = DateTime::now();
Console::success("Starting timers at {$time}");
/** /**
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min. * The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
*/ */
Co\run( Co\run(
function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
$time = DateTime::now(); $time = DateTime::now();
$limit = 1000; $limit = 1000;
$sum = $limit; $sum = $limit;
@ -115,7 +169,7 @@ $cli
$latestDocument = null; $latestDocument = null;
$timerStart = \microtime(true); $timerStart = \microtime(true);
Console::warning("Update proc started at: $time last update was at $lastUpdate"); //Console::warning("Update proc started at: $time last update was at $lastUpdate");
while ($sum === $limit) { while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)]; $paginationQueries = [Query::limit($limit)];
@ -125,7 +179,7 @@ $cli
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('resourceType', ['function']), Query::equal('resourceType', ['function']),
Query::greaterThan('resourceUpdatedAt', $lastUpdate), Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate),
])); ]));
$sum = count($results); $sum = count($results);
@ -134,17 +188,12 @@ $cli
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']); $new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) { if ($document['active'] === false) {
Console::warning("Removing: {$document['resourceId']}"); //Console::warning("Removing: {$document['resourceId']}");
unset($functions[$document['resourceId']]); $removeFromQueue($document->getId(), $document['resourceId']);
} elseif ($new > $org) { } elseif ($new > $org) {
Console::warning("Updating: {$document['resourceId']}"); //Console::warning("Updating: {$document['resourceId']}");
$functions[$document['resourceId']] = [ $updateQueue($document['resourceId'], getsSheduleAttributes($document));
'resourceId' => $document->getAttribute('resourceId'),
'schedule' => $document->getAttribute('schedule'),
'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'),
];
} }
$removeFromQueue($document['resourceId']);
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
@ -154,7 +203,7 @@ $cli
$createQueue(); $createQueue();
$timerEnd = \microtime(true); $timerEnd = \microtime(true);
Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); //Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds");
}); });
/** /**
@ -165,17 +214,27 @@ $cli
$time = DateTime::now(); $time = DateTime::now();
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
$slot = (new \DateTime())->format('Y-m-d H:i:00.000'); $slot = (new \DateTime())->format('Y-m-d H:i:00.000');
$prepareStart = time();
Console::info("Enqueue proc started at: $time"); Console::info("Enqueue proc started at: $time");
if (array_key_exists($slot, $queue)) { if (array_key_exists($slot, $queue)) {
$schedule = $queue[$slot]; $schedule = $queue[$slot];
console::info(count($schedule) . " functions sent to worker for time slot " . $slot); console::info(count($schedule) . " functions sent to worker for time slot " . $slot);
$totalPreparation = time() - $prepareStart;
$wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation);
Console::info("Waiting for : {$wait} seconds");
sleep($wait);
$time = DateTime::now();
Console::info("Start enqueueing at {$time}");
foreach ($schedule as $function) { foreach ($schedule as $function) {
/** if (empty($functions[$function['resourceId']])) {
* Enqueue function (here should be the Enqueue call continue;
*/ }
$cron = new CronExpression($function['schedule']); $cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate()); $next = DateTime::format($cron->getNextRunDate());
@ -185,8 +244,7 @@ $cli
*/ */
if ( if (
$next < $timeFrame && $next < $timeFrame &&
!empty($functions[$function['resourceId']] && $function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule']
$function['schedule'] === $functions[$function['resourceId']]['schedule'])
) { ) {
$queue[$next][$function['resourceId']] = $function; $queue[$next][$function['resourceId']] = $function;
} }