addressing comments
This commit is contained in:
parent
9d641b985a
commit
aef565c8ff
4 changed files with 39 additions and 45 deletions
|
@ -2272,7 +2272,7 @@ $collections = [
|
|||
'filters' => [],
|
||||
],
|
||||
[
|
||||
'$id' => ID::custom('scheduleUpdatedAt'), // Used to fix duplicate executions bug. Can be removed once new queue library is used
|
||||
'$id' => ID::custom('scheduleUpdatedAt'),
|
||||
'type' => Database::VAR_DATETIME,
|
||||
'format' => '',
|
||||
'size' => 0,
|
||||
|
|
|
@ -89,7 +89,7 @@ App::post('/v1/functions')
|
|||
'search' => implode(' ', [$functionId, $name, $runtime])
|
||||
]));
|
||||
|
||||
$log = Authorization::skip(
|
||||
$schedule = Authorization::skip(
|
||||
fn() => $dbForConsole->createDocument('schedules', new Document([
|
||||
'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region
|
||||
'resourceType' => 'function',
|
||||
|
@ -101,7 +101,7 @@ App::post('/v1/functions')
|
|||
]))
|
||||
);
|
||||
|
||||
$function->setAttribute('scheduleId', $log->getId());
|
||||
$function->setAttribute('scheduleId', $schedule->getId());
|
||||
$dbForProject->updateDocument('functions', $function->getId(), $function);
|
||||
|
||||
$eventsInstance->setParam('functionId', $function->getId());
|
||||
|
@ -469,21 +469,21 @@ App::put('/v1/functions/:functionId')
|
|||
'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]),
|
||||
])));
|
||||
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
/**
|
||||
* In case we want to clear the schedule
|
||||
*/
|
||||
if (!empty($function->getAttribute('deployment'))) {
|
||||
$log->setAttribute('resourceUpdatedAt', $function['scheduleUpdatedAt']);
|
||||
$schedule->setAttribute('resourceUpdatedAt', $function['scheduleUpdatedAt']);
|
||||
}
|
||||
|
||||
$log
|
||||
$schedule
|
||||
->setAttribute('schedule', $function->getAttribute('schedule'))
|
||||
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
|
||||
|
||||
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
|
||||
|
||||
$eventsInstance->setParam('functionId', $function->getId());
|
||||
|
||||
|
@ -537,18 +537,18 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
|
|||
'deployment' => $deployment->getId()
|
||||
])));
|
||||
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$active = !empty($function->getAttribute('schedule'));
|
||||
|
||||
if ($active) {
|
||||
$log->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
$schedule->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
}
|
||||
|
||||
$log->setAttribute('active', $active);
|
||||
$schedule->setAttribute('active', $active);
|
||||
|
||||
Authorization::skip(function () use ($dbForConsole, $log) {
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
Authorization::skip(function () use ($dbForConsole, $schedule) {
|
||||
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
|
||||
});
|
||||
|
||||
$events
|
||||
|
@ -590,15 +590,15 @@ App::delete('/v1/functions/:functionId')
|
|||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
|
||||
}
|
||||
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$log
|
||||
$schedule
|
||||
->setAttribute('resourceUpdatedAt', DateTime::now())
|
||||
->setAttribute('active', false)
|
||||
;
|
||||
|
||||
Authorization::skip(function () use ($dbForConsole, $log) {
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
Authorization::skip(function () use ($dbForConsole, $schedule) {
|
||||
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
|
||||
});
|
||||
|
||||
$deletes
|
||||
|
@ -794,18 +794,18 @@ App::post('/v1/functions/:functionId/deployments')
|
|||
* TODO Should we update also the function collection with the scheduleUpdatedAt attr?
|
||||
*/
|
||||
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$active = !empty($function->getAttribute('schedule'));
|
||||
|
||||
if ($active) {
|
||||
$log->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
$schedule->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
}
|
||||
|
||||
$log->setAttribute('active', $active);
|
||||
$schedule->setAttribute('active', $active);
|
||||
|
||||
Authorization::skip(function () use ($dbForConsole, $log) {
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
Authorization::skip(function () use ($dbForConsole, $schedule) {
|
||||
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
|
||||
});
|
||||
|
||||
$metadata = null;
|
||||
|
|
|
@ -12,20 +12,19 @@ use Swoole\Timer;
|
|||
|
||||
const FUNCTION_UPDATE_TIMER = 60; //seconds
|
||||
const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
||||
const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min
|
||||
sleep(4); // Todo prevent PDOException
|
||||
|
||||
const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min
|
||||
|
||||
sleep(4);
|
||||
/**
|
||||
* 1. first load from db with limit+offset --line 82--
|
||||
* 2. creating a 5-min offset array ($queue) --line 102--
|
||||
* 1. first load from db with limit+offset
|
||||
* 2. creating a 5-min offset array ($queue)
|
||||
* 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta)
|
||||
* if the function matches the current minute it should be dispatched to the functions worker.
|
||||
* Then another translation is made to the cron pattern if it is in the next 5-min window
|
||||
* it is assigned again to the $queue. --line 172--.
|
||||
* it is assigned again to the $queue. .
|
||||
* 4. Second timer runs every X min and updates the $functions (large) list.
|
||||
* The query fetches only functions that [resourceUpdatedAt] attr changed from the
|
||||
* last time the timer that was fired (X min) --line 120--
|
||||
* last time the timer that was fired (X min)
|
||||
* If the function was deleted it is unsets from the list ($functions) and the $queue.
|
||||
* In the end of the timer the $queue is created again.
|
||||
*
|
||||
|
@ -43,7 +42,7 @@ $cli
|
|||
/**
|
||||
* Creating smaller functions list containing 5-min timeframe.
|
||||
*/
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME);
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
|
||||
foreach ($functions as $function) {
|
||||
$cron = new CronExpression($function['schedule']);
|
||||
$next = DateTime::format($cron->getNextRunDate());
|
||||
|
@ -107,8 +106,7 @@ $cli
|
|||
/**
|
||||
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
|
||||
*/
|
||||
Co\run(
|
||||
function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
|
||||
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
$time = DateTime::now();
|
||||
$limit = 1000;
|
||||
|
@ -165,7 +163,7 @@ $cli
|
|||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
||||
$timerStart = \microtime(true);
|
||||
$time = DateTime::now();
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
|
||||
$slot = (new \DateTime())->format('Y-m-d H:i:00.000');
|
||||
|
||||
Console::info("Enqueue proc started at: $time");
|
||||
|
@ -175,17 +173,16 @@ $cli
|
|||
console::info(count($schedule) . " functions sent to worker for time slot " . $slot);
|
||||
|
||||
foreach ($schedule as $function) {
|
||||
/**
|
||||
* Enqueue function (here should be the Enqueue call
|
||||
*/
|
||||
//Console::warning("Enqueueing :{$function['resourceId']}");
|
||||
/**
|
||||
* Enqueue function (here should be the Enqueue call
|
||||
*/
|
||||
$cron = new CronExpression($function['schedule']);
|
||||
$next = DateTime::format($cron->getNextRunDate());
|
||||
|
||||
/**
|
||||
* If next schedule is in 5-min timeframe
|
||||
* and it was not removed or changed, re-enqueue the function.
|
||||
*/
|
||||
/**
|
||||
* If next schedule is in 5-min timeframe
|
||||
* and it was not removed or changed, re-enqueue the function.
|
||||
*/
|
||||
if (
|
||||
$next < $timeFrame &&
|
||||
!empty($functions[$function['resourceId']] &&
|
||||
|
@ -200,6 +197,6 @@ $cli
|
|||
$timerEnd = \microtime(true);
|
||||
Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds");
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
});
|
||||
|
|
|
@ -108,7 +108,6 @@ services:
|
|||
- ./public:/usr/src/code/public
|
||||
- ./src:/usr/src/code/src
|
||||
- ./dev:/usr/local/dev
|
||||
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
|
||||
depends_on:
|
||||
- mariadb
|
||||
- redis
|
||||
|
@ -346,7 +345,6 @@ services:
|
|||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
|
||||
depends_on:
|
||||
- redis
|
||||
- mariadb
|
||||
|
@ -584,7 +582,6 @@ services:
|
|||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
|
||||
depends_on:
|
||||
- redis
|
||||
environment:
|
||||
|
|
Loading…
Reference in a new issue