1
0
Fork 0
mirror of synced 2024-09-28 23:41:23 +12:00

feat: use coroutines delay system

This commit is contained in:
loks0n 2024-07-18 13:03:24 +01:00
parent 31d4932fdf
commit f02bf6ac5a

View file

@ -26,6 +26,7 @@ class ScheduleExecutions extends ScheduleBase
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -38,25 +39,29 @@ class ScheduleExecutions extends ScheduleBase
continue;
}
$now = new \DateTime();
$scheduledAt = new \DateTime($schedule['schedule']);
if ($scheduledAt > $now) {
if ($scheduledAt <= $intervalEnd) {
continue;
}
$queueForFunctions
->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($schedule['data']['method'] ?? 'POST')
->setPath($schedule['data']['path'] ?? '/')
->setHeaders($schedule['data']['headers'] ?? [])
->setBody($schedule['data']['body'] ?? '')
->setProject($schedule['project'])
->trigger();
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
\go(function () use ($queueForFunctions, $schedule, $delay) {
\sleep($delay);
$queueForFunctions
->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($schedule['data']['method'] ?? 'POST')
->setPath($schedule['data']['path'] ?? '/')
->setHeaders($schedule['data']['headers'] ?? [])
->setBody($schedule['data']['body'] ?? '')
->setProject($schedule['project'])
->trigger();
});
$dbForConsole->deleteDocument(
'schedules',