queue
This commit is contained in:
parent
6f9dcae858
commit
9ed6ed69fb
|
@ -1,4 +1,5 @@
|
||||||
<?php
|
<?php
|
||||||
|
|
||||||
ini_set('memory_limit', -1);
|
ini_set('memory_limit', -1);
|
||||||
ini_set('max_execution_time', -1);
|
ini_set('max_execution_time', -1);
|
||||||
global $cli;
|
global $cli;
|
||||||
|
@ -17,6 +18,20 @@ const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min
|
||||||
sleep(4); // Todo prevent PDOException
|
sleep(4); // Todo prevent PDOException
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 1. first load from db with limit+offset --line 82--
|
||||||
|
* 2. creating a 5-min offset array ($queue) --line 102--
|
||||||
|
* 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta)
|
||||||
|
* if the function matches the current minute it should be dispatched to the functions worker.
|
||||||
|
* Then another translation is made to the cron pattern if it is in the next 5-min window
|
||||||
|
* it is assigned again to the $queue. --line 172--.
|
||||||
|
* 4. Second timer runs every X min and updates the $functions (large) list.
|
||||||
|
* The query fetches only functions that [resourceUpdatedAt] attr changed from the
|
||||||
|
* last time the timer that was fired (X min) --line 120--
|
||||||
|
* If the function was deleted it is unsets from the list ($functions) and the $queue.
|
||||||
|
* In the end of the timer the $queue is created again.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
$cli
|
$cli
|
||||||
->task('schedule')
|
->task('schedule')
|
||||||
|
@ -40,7 +55,6 @@ $cli
|
||||||
}
|
}
|
||||||
$loadEnd = \microtime(true);
|
$loadEnd = \microtime(true);
|
||||||
Console::error("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
|
Console::error("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
$removeFromQueue = function ($scheduleId) use (&$queue) {
|
$removeFromQueue = function ($scheduleId) use (&$queue) {
|
||||||
|
@ -54,7 +68,6 @@ $cli
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
$dbForConsole = getConsoleDB();
|
$dbForConsole = getConsoleDB();
|
||||||
$limit = 200;
|
$limit = 200;
|
||||||
$sum = $limit;
|
$sum = $limit;
|
||||||
|
@ -76,7 +89,6 @@ $cli
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$sum = count($results);
|
$sum = count($results);
|
||||||
|
|
||||||
$total = $total + $sum;
|
$total = $total + $sum;
|
||||||
foreach ($results as $document) {
|
foreach ($results as $document) {
|
||||||
$functions[$document['resourceId']] = $document;
|
$functions[$document['resourceId']] = $document;
|
||||||
|
@ -96,8 +108,10 @@ $cli
|
||||||
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||||
$time = DateTime::now();
|
$time = DateTime::now();
|
||||||
$count = 0;
|
$count = 0;
|
||||||
$limit = 50;
|
$limit = 200;
|
||||||
$sum = $limit;
|
$sum = $limit;
|
||||||
|
$total = 0;
|
||||||
|
$timerStart = \microtime(true);
|
||||||
|
|
||||||
Console::info("Update proc run at: $time last update was at $lastUpdate");
|
Console::info("Update proc run at: $time last update was at $lastUpdate");
|
||||||
/**
|
/**
|
||||||
|
@ -112,6 +126,7 @@ $cli
|
||||||
Query::offset($count * $limit),
|
Query::offset($count * $limit),
|
||||||
]);
|
]);
|
||||||
$sum = count($results);
|
$sum = count($results);
|
||||||
|
$total = $total + $sum;
|
||||||
foreach ($results as $document) {
|
foreach ($results as $document) {
|
||||||
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
|
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
|
||||||
$new = strtotime($document['resourceUpdatedAt']);
|
$new = strtotime($document['resourceUpdatedAt']);
|
||||||
|
@ -126,24 +141,27 @@ $cli
|
||||||
}
|
}
|
||||||
$count++;
|
$count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
$lastUpdate = DateTime::now();
|
$lastUpdate = DateTime::now();
|
||||||
$createQueue();
|
$createQueue();
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
|
||||||
|
Console::error("Update timer: {$total} functions where updated in " . ($timerStart - $timerEnd) . " seconds");
|
||||||
});
|
});
|
||||||
|
|
||||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
||||||
|
$timerStart = \microtime(true);
|
||||||
$time = DateTime::now();
|
$time = DateTime::now();
|
||||||
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */
|
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */
|
||||||
$now = (new \DateTime())->format('Y-m-d H:i:00.000');
|
$now = (new \DateTime())->format('Y-m-d H:i:00.000');
|
||||||
|
|
||||||
Console::info("Enqueue proc run at: $time");
|
Console::info("Enqueue proc run at: $time");
|
||||||
// Debug
|
// Debug
|
||||||
foreach ($queue as $slot => $schedule) {
|
// foreach ($queue as $slot => $schedule) {
|
||||||
Console::log("Slot: $slot");
|
// Console::log("Slot: $slot");
|
||||||
foreach ($schedule as $function) {
|
// foreach ($schedule as $function) {
|
||||||
Console::log("{$function['resourceId']} {$function['schedule']}");
|
// Console::log("{$function['resourceId']} {$function['schedule']}");
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lopping time slots
|
* Lopping time slots
|
||||||
|
@ -153,11 +171,12 @@ $cli
|
||||||
if ($now === $slot) {
|
if ($now === $slot) {
|
||||||
foreach ($schedule as $function) {
|
foreach ($schedule as $function) {
|
||||||
/**
|
/**
|
||||||
* Enqueue function
|
* Enqueue function (here should be the Enqueue call
|
||||||
*/
|
*/
|
||||||
Console::warning("Enqueueing :{$function['resourceId']}");
|
Console::warning("Enqueueing :{$function['resourceId']}");
|
||||||
$cron = new CronExpression($function['schedule']);
|
$cron = new CronExpression($function['schedule']);
|
||||||
$next = DateTime::format($cron->getNextRunDate());
|
$next = DateTime::format($cron->getNextRunDate());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If next schedule is in 5-min timeframe
|
* If next schedule is in 5-min timeframe
|
||||||
* and it was not removed or changed, re-enqueue the function.
|
* and it was not removed or changed, re-enqueue the function.
|
||||||
|
@ -175,6 +194,8 @@ $cli
|
||||||
unset($queue[$slot]); /** removing slot */
|
unset($queue[$slot]); /** removing slot */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
Console::error("Queue timer: finished in " . ($timerStart - $timerEnd) . " seconds");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in a new issue