1
0
Fork 0
mirror of synced 2024-10-02 10:16:27 +13:00

Linter fix

This commit is contained in:
Matej Baco 2022-11-15 12:32:36 +01:00
parent 1084631d0f
commit 9539e27907

View file

@ -16,8 +16,8 @@ use function Swoole\Coroutine\run;
class Schedule extends Action class Schedule extends Action
{ {
const FUNCTION_UPDATE_TIMER = 10; //seconds public const FUNCTION_UPDATE_TIMER = 10; //seconds
const FUNCTION_ENQUEUE_TIMER = 60; //seconds public const FUNCTION_ENQUEUE_TIMER = 60; //seconds
public static function getName(): string public static function getName(): string
{ {
@ -42,10 +42,10 @@ class Schedule extends Action
{ {
Console::title('Scheduler V1'); Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started'); Console::success(APP_NAME . ' Scheduler v1 has started');
/** /**
* Extract only nessessary attributes to lower memory used. * Extract only nessessary attributes to lower memory used.
* *
* @var Document $schedule * @var Document $schedule
* @return array * @return array
*/ */
@ -55,7 +55,7 @@ class Schedule extends Action
[ $database, $reclaim ] = $getProjectDB($project); [ $database, $reclaim ] = $getProjectDB($project);
$function = $database->getDocument('functions', $schedule->getAttribute('resourceId')); $function = $database->getDocument('functions', $schedule->getAttribute('resourceId'));
$reclaim(); $reclaim();
return [ return [
'resourceId' => $schedule->getAttribute('resourceId'), 'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'), 'schedule' => $schedule->getAttribute('schedule'),
@ -64,16 +64,16 @@ class Schedule extends Action
'function' => $function, 'function' => $function,
]; ];
}; };
$schedules = []; // Local copy of 'schedules' collection $schedules = []; // Local copy of 'schedules' collection
$lastSyncUpdate = DateTime::now(); $lastSyncUpdate = DateTime::now();
$limit = 10000; $limit = 10000;
$sum = $limit; $sum = $limit;
$total = 0; $total = 0;
$loadStart = \microtime(true); $loadStart = \microtime(true);
$latestDocument = null; $latestDocument = null;
while ($sum === $limit) { while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)]; $paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) { if ($latestDocument !== null) {
@ -84,20 +84,20 @@ class Schedule extends Action
Query::equal('resourceType', ['function']), Query::equal('resourceType', ['function']),
Query::equal('active', [true]), Query::equal('active', [true]),
])); ]));
$sum = count($results); $sum = count($results);
$total = $total + $sum; $total = $total + $sum;
foreach ($results as $document) { foreach ($results as $document) {
$schedules[$document['resourceId']] = $getSchedule($document); $schedules[$document['resourceId']] = $getSchedule($document);
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
} }
Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now()); Console::success("Starting timers at " . DateTime::now());
run( run(
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) {
/** /**
@ -106,14 +106,14 @@ class Schedule extends Action
Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) {
$time = DateTime::now(); $time = DateTime::now();
$timerStart = \microtime(true); $timerStart = \microtime(true);
$limit = 1000; $limit = 1000;
$sum = $limit; $sum = $limit;
$total = 0; $total = 0;
$latestDocument = null; $latestDocument = null;
Console::log("Sync tick: Running at $time"); Console::log("Sync tick: Running at $time");
while ($sum === $limit) { while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)]; $paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) { if ($latestDocument !== null) {
@ -124,15 +124,15 @@ class Schedule extends Action
Query::equal('resourceType', ['function']), Query::equal('resourceType', ['function']),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
])); ]));
$sum = count($results); $sum = count($results);
$total = $total + $sum; $total = $total + $sum;
foreach ($results as $document) { foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null; $localDocument = $schedules[$document['resourceId']] ?? null;
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']); $new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) { if ($document['active'] === false) {
Console::info("Removing: {$document['resourceId']}"); Console::info("Removing: {$document['resourceId']}");
unset($schedules[$document['resourceId']]); unset($schedules[$document['resourceId']]);
@ -143,13 +143,13 @@ class Schedule extends Action
} }
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
} }
$lastSyncUpdate = $time; $lastSyncUpdate = $time;
$timerEnd = \microtime(true); $timerEnd = \microtime(true);
Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds");
}); });
/** /**
* The timer to prepare soon-to-execute schedules. * The timer to prepare soon-to-execute schedules.
*/ */
@ -157,62 +157,62 @@ class Schedule extends Action
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) {
$timerStart = \microtime(true); $timerStart = \microtime(true);
$time = DateTime::now(); $time = DateTime::now();
$enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$total = 0; $total = 0;
$delayedExecutions = []; // Group executions with same delay to share one coroutine $delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($schedules as $key => $schedule) { foreach ($schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']); $cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate(); $nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate); $next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame; $currentTick = $next < $timeFrame;
if(!$currentTick) { if (!$currentTick) {
continue; continue;
} }
$total++; $total++;
$promiseStart = \microtime(true); // in seconds $promiseStart = \microtime(true); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds $executionStart = $nextDate->getTimestamp(); // in seconds
$executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
$delay = \ceil(\intval($executionSleep)); $delay = \ceil(\intval($executionSleep));
if(!isset($delayedExecutions[$delay])) { if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = []; $delayedExecutions[$delay] = [];
} }
$delayedExecutions[$delay][] = $key; $delayedExecutions[$delay][] = $key;
} }
foreach($delayedExecutions as $delay => $scheduleKeys) { foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function() use ($delay, $schedules, $scheduleKeys) { \go(function () use ($delay, $schedules, $scheduleKeys) {
\sleep($delay); // in seconds \sleep($delay); // in seconds
foreach($scheduleKeys as $scheduleKey) { foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted // Ensure schedule was not deleted
if(!isset($schedules[$scheduleKey])) { if (!isset($schedules[$scheduleKey])) {
return; return;
} }
Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue
} }
}); });
} }
$timerEnd = \microtime(true); $timerEnd = \microtime(true);
$lastEnqueueUpdate = $timerStart; $lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds");
}; };
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
$enqueueFunctions(); $enqueueFunctions();
} }