From 9ed6ed69fb60ab1ccc6dad66bd844724b6e034b8 Mon Sep 17 00:00:00 2001 From: shimon Date: Mon, 7 Nov 2022 13:22:52 +0200 Subject: [PATCH] queue --- app/tasks/schedule.php | 45 +++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index b86d5a286d..2fc7acd1e9 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -1,4 +1,5 @@ task('schedule') @@ -40,7 +55,6 @@ $cli } $loadEnd = \microtime(true); Console::error("Queue was built in " . ($loadEnd - $loadStart) . " seconds"); - }; $removeFromQueue = function ($scheduleId) use (&$queue) { @@ -54,7 +68,6 @@ $cli } }; - $dbForConsole = getConsoleDB(); $limit = 200; $sum = $limit; @@ -76,7 +89,6 @@ $cli ]); $sum = count($results); - $total = $total + $sum; foreach ($results as $document) { $functions[$document['resourceId']] = $document; @@ -96,8 +108,10 @@ $cli Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { $time = DateTime::now(); $count = 0; - $limit = 50; + $limit = 200; $sum = $limit; + $total = 0; + $timerStart = \microtime(true); Console::info("Update proc run at: $time last update was at $lastUpdate"); /** @@ -112,6 +126,7 @@ $cli Query::offset($count * $limit), ]); $sum = count($results); + $total = $total + $sum; foreach ($results as $document) { $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); @@ -126,24 +141,27 @@ $cli } $count++; } - $lastUpdate = DateTime::now(); $createQueue(); + $timerEnd = \microtime(true); + + Console::error("Update timer: {$total} functions where updated in " . ($timerStart - $timerEnd) . " seconds"); }); Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { + $timerStart = \microtime(true); $time = DateTime::now(); $timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */ $now = (new \DateTime())->format('Y-m-d H:i:00.000'); Console::info("Enqueue proc run at: $time"); // Debug - foreach ($queue as $slot => $schedule) { - Console::log("Slot: $slot"); - foreach ($schedule as $function) { - Console::log("{$function['resourceId']} {$function['schedule']}"); - } - } + // foreach ($queue as $slot => $schedule) { + // Console::log("Slot: $slot"); + // foreach ($schedule as $function) { + // Console::log("{$function['resourceId']} {$function['schedule']}"); + // } + // } /** * Lopping time slots @@ -153,11 +171,12 @@ $cli if ($now === $slot) { foreach ($schedule as $function) { /** - * Enqueue function + * Enqueue function (here should be the Enqueue call */ Console::warning("Enqueueing :{$function['resourceId']}"); $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); + /** * If next schedule is in 5-min timeframe * and it was not removed or changed, re-enqueue the function. @@ -175,6 +194,8 @@ $cli unset($queue[$slot]); /** removing slot */ } } + $timerEnd = \microtime(true); + Console::error("Queue timer: finished in " . ($timerStart - $timerEnd) . " seconds"); }); } );