queue
This commit is contained in:
parent
ef7b53fbc7
commit
6f9dcae858
3 changed files with 56 additions and 21 deletions
|
@ -458,16 +458,10 @@ App::put('/v1/functions/:functionId')
|
|||
|
||||
$enabled ??= $function->getAttribute('enabled', true);
|
||||
|
||||
$log = $schedule
|
||||
->setAttribute('resourceUpdatedAt', $function['resourceUpdatedAt'])
|
||||
->setAttribute('schedule', $function['schedule'])
|
||||
->setAttribute('active', !empty($function->getAttribute('schedule') || !empty($schedule)));
|
||||
|
||||
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
|
||||
'execute' => $execute,
|
||||
'name' => $name,
|
||||
'events' => $events,
|
||||
'scheduleId' => $log->getId(),
|
||||
'schedule' => $schedule,
|
||||
'scheduleUpdatedAt' => DateTime::now(),
|
||||
'timeout' => $timeout,
|
||||
|
@ -475,6 +469,15 @@ App::put('/v1/functions/:functionId')
|
|||
'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]),
|
||||
])));
|
||||
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$log
|
||||
->setAttribute('resourceUpdatedAt', $function['scheduleUpdatedAt'])
|
||||
->setAttribute('schedule', $function['schedule'])
|
||||
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
|
||||
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
|
||||
$eventsInstance->setParam('functionId', $function->getId());
|
||||
|
||||
$response->dynamic($function, Response::MODEL_FUNCTION);
|
||||
|
@ -527,9 +530,15 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
|
|||
'deployment' => $deployment->getId()
|
||||
])));
|
||||
|
||||
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$log->setAttribute('active', true);
|
||||
$active = !empty($function->getAttribute('schedule'));
|
||||
|
||||
if ($active) {
|
||||
$log->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
}
|
||||
|
||||
$log->setAttribute('active', $active);
|
||||
|
||||
Authorization::skip(function () use ($dbForConsole, $log) {
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
|
@ -574,7 +583,7 @@ App::delete('/v1/functions/:functionId')
|
|||
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
|
||||
}
|
||||
|
||||
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$log
|
||||
->setAttribute('resourceUpdatedAt', DateTime::now())
|
||||
|
@ -774,9 +783,19 @@ App::post('/v1/functions/:functionId/deployments')
|
|||
}
|
||||
}
|
||||
|
||||
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
|
||||
/**
|
||||
* TODO Should we update also the function collection with the scheduleUpdatedAt attr?
|
||||
*/
|
||||
|
||||
$log->setAttribute('active', true);
|
||||
$log = $dbForConsole->getDocument('schedules', $function['scheduleId']);
|
||||
|
||||
$active = !empty($function->getAttribute('schedule'));
|
||||
|
||||
if ($active) {
|
||||
$log->setAttribute('resourceUpdatedAt', datetime::now());
|
||||
}
|
||||
|
||||
$log->setAttribute('active', $active);
|
||||
|
||||
Authorization::skip(function () use ($dbForConsole, $log) {
|
||||
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
<?php
|
||||
|
||||
ini_set('memory_limit', -1);
|
||||
ini_set('max_execution_time', -1);
|
||||
global $cli;
|
||||
global $register;
|
||||
|
||||
|
@ -15,6 +16,8 @@ const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
|||
const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min
|
||||
sleep(4); // Todo prevent PDOException
|
||||
|
||||
|
||||
|
||||
$cli
|
||||
->task('schedule')
|
||||
->desc('Function scheduler task')
|
||||
|
@ -23,6 +26,7 @@ $cli
|
|||
Console::success(APP_NAME . ' Scheduler v1 has started');
|
||||
|
||||
$createQueue = function () use (&$functions, &$queue) {
|
||||
$loadStart = \microtime(true);
|
||||
/**
|
||||
* Creating smaller functions list containing 5-min timeframe.
|
||||
*/
|
||||
|
@ -34,6 +38,9 @@ $cli
|
|||
$queue[$next][$function['resourceId']] = $function;
|
||||
}
|
||||
}
|
||||
$loadEnd = \microtime(true);
|
||||
Console::error("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
|
||||
|
||||
};
|
||||
|
||||
$removeFromQueue = function ($scheduleId) use (&$queue) {
|
||||
|
@ -49,12 +56,13 @@ $cli
|
|||
|
||||
|
||||
$dbForConsole = getConsoleDB();
|
||||
$count = 0;
|
||||
$limit = 50;
|
||||
$limit = 200;
|
||||
$sum = $limit;
|
||||
$functions = [];
|
||||
$queue = [];
|
||||
|
||||
$count = 0;
|
||||
$loadStart = \microtime(true);
|
||||
$total = 0;
|
||||
/**
|
||||
* Initial run fill $functions list
|
||||
*/
|
||||
|
@ -63,17 +71,24 @@ $cli
|
|||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('resourceType', ['function']),
|
||||
Query::equal('active', [true]),
|
||||
Query::limit($limit)
|
||||
Query::offset($count * $limit),
|
||||
Query::limit($limit),
|
||||
]);
|
||||
|
||||
$sum = count($results);
|
||||
|
||||
$total = $total + $sum;
|
||||
foreach ($results as $document) {
|
||||
$functions[$document['resourceId']] = $document;
|
||||
$count++;
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
|
||||
$loadEnd = \microtime(true);
|
||||
Console::error("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds");
|
||||
|
||||
$createQueue();
|
||||
|
||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_VALIDATION_TIMER);
|
||||
|
||||
Co\run(
|
||||
|
@ -88,12 +103,13 @@ $cli
|
|||
/**
|
||||
* Updating functions list from DB.
|
||||
*/
|
||||
while ($sum === $limit) {
|
||||
while (!empty($sum)) {
|
||||
$results = $dbForConsole->find('schedules', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('resourceType', ['function']),
|
||||
Query::greaterThan('resourceUpdatedAt', $lastUpdate),
|
||||
Query::limit($limit)
|
||||
Query::limit($limit),
|
||||
Query::offset($count * $limit),
|
||||
]);
|
||||
$sum = count($results);
|
||||
foreach ($results as $document) {
|
||||
|
@ -107,8 +123,8 @@ $cli
|
|||
$functions[$document['resourceId']] = $document;
|
||||
}
|
||||
$removeFromQueue($document['resourceId']);
|
||||
$count++;
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
|
||||
$lastUpdate = DateTime::now();
|
||||
|
|
|
@ -108,7 +108,7 @@ services:
|
|||
- ./public:/usr/src/code/public
|
||||
- ./src:/usr/src/code/src
|
||||
- ./dev:/usr/local/dev
|
||||
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
|
||||
#- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
|
||||
depends_on:
|
||||
- mariadb
|
||||
- redis
|
||||
|
|
Loading…
Reference in a new issue