Merge pull request #4668 from appwrite/refactor-scheduler-improvements
Improve schedule accuracy + simplify
This commit is contained in:
commit
6be62fb20e
5 changed files with 242 additions and 235 deletions
17
app/cli.php
17
app/cli.php
|
@ -56,20 +56,29 @@ CLI::setResource('dbForConsole', function ($pools, $cache) {
|
||||||
}, ['pools', 'cache']);
|
}, ['pools', 'cache']);
|
||||||
|
|
||||||
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
|
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
|
||||||
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache) {
|
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
|
||||||
|
|
||||||
|
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
|
||||||
if ($project->isEmpty() || $project->getId() === 'console') {
|
if ($project->isEmpty() || $project->getId() === 'console') {
|
||||||
return $dbForConsole;
|
return $dbForConsole;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$databaseName = $project->getAttribute('database');
|
||||||
|
|
||||||
|
if (isset($databases[$databaseName])) {
|
||||||
|
return $databases[$databaseName];
|
||||||
|
}
|
||||||
|
|
||||||
$dbAdapter = $pools
|
$dbAdapter = $pools
|
||||||
->get($project->getAttribute('database'))
|
->get($databaseName)
|
||||||
->pop()
|
->pop()
|
||||||
->getResource()
|
->getResource();
|
||||||
;
|
|
||||||
|
|
||||||
$database = new Database($dbAdapter, $cache);
|
$database = new Database($dbAdapter, $cache);
|
||||||
$database->setNamespace('_' . $project->getInternalId());
|
$database->setNamespace('_' . $project->getInternalId());
|
||||||
|
|
||||||
|
$databases[$databaseName] = $database;
|
||||||
|
|
||||||
return $database;
|
return $database;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ use Appwrite\Platform\Tasks\Doctor;
|
||||||
use Appwrite\Platform\Tasks\Install;
|
use Appwrite\Platform\Tasks\Install;
|
||||||
use Appwrite\Platform\Tasks\Maintenance;
|
use Appwrite\Platform\Tasks\Maintenance;
|
||||||
use Appwrite\Platform\Tasks\Migrate;
|
use Appwrite\Platform\Tasks\Migrate;
|
||||||
|
use Appwrite\Platform\Tasks\Schedule;
|
||||||
use Appwrite\Platform\Tasks\SDKs;
|
use Appwrite\Platform\Tasks\SDKs;
|
||||||
use Appwrite\Platform\Tasks\Specs;
|
use Appwrite\Platform\Tasks\Specs;
|
||||||
use Appwrite\Platform\Tasks\SSL;
|
use Appwrite\Platform\Tasks\SSL;
|
||||||
|
@ -28,6 +29,7 @@ class Tasks extends Service
|
||||||
->addAction(Doctor::getName(), new Doctor())
|
->addAction(Doctor::getName(), new Doctor())
|
||||||
->addAction(Install::getName(), new Install())
|
->addAction(Install::getName(), new Install())
|
||||||
->addAction(Maintenance::getName(), new Maintenance())
|
->addAction(Maintenance::getName(), new Maintenance())
|
||||||
|
->addAction(Schedule::getName(), new Schedule())
|
||||||
->addAction(Migrate::getName(), new Migrate())
|
->addAction(Migrate::getName(), new Migrate())
|
||||||
->addAction(SDKs::getName(), new SDKs())
|
->addAction(SDKs::getName(), new SDKs())
|
||||||
->addAction(VolumeSync::getName(), new VolumeSync())
|
->addAction(VolumeSync::getName(), new VolumeSync())
|
||||||
|
|
|
@ -139,6 +139,8 @@ class Maintenance extends Action
|
||||||
notifyDeleteExpiredSessions();
|
notifyDeleteExpiredSessions();
|
||||||
renewCertificates($dbForConsole);
|
renewCertificates($dbForConsole);
|
||||||
notifyDeleteCache($cacheRetention);
|
notifyDeleteCache($cacheRetention);
|
||||||
|
|
||||||
|
// TODO: @Meldiron Every probably 24h, look for schedules with active=false, that doesnt have function anymore. Dlete such schedule
|
||||||
}, $interval);
|
}, $interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
225
src/Appwrite/Platform/Tasks/Schedule.php
Normal file
225
src/Appwrite/Platform/Tasks/Schedule.php
Normal file
|
@ -0,0 +1,225 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
|
use Cron\CronExpression;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Swoole\Timer;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Pools\Group;
|
||||||
|
|
||||||
|
use function Swoole\Coroutine\run;
|
||||||
|
|
||||||
|
class Schedule extends Action
|
||||||
|
{
|
||||||
|
public const FUNCTION_UPDATE_TIMER = 10; //seconds
|
||||||
|
public const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'schedule';
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->desc('Execute functions scheduled in Appwrite')
|
||||||
|
->inject('pools')
|
||||||
|
->inject('dbForConsole')
|
||||||
|
->inject('getProjectDB')
|
||||||
|
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 1. Load all documents from 'schedules' collection to create local copy
|
||||||
|
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
||||||
|
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
|
||||||
|
*/
|
||||||
|
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||||
|
{
|
||||||
|
Console::title('Scheduler V1');
|
||||||
|
Console::success(APP_NAME . ' Scheduler v1 has started');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract only nessessary attributes to lower memory used.
|
||||||
|
*
|
||||||
|
* @var Document $schedule
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
|
||||||
|
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
|
||||||
|
|
||||||
|
$function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'));
|
||||||
|
|
||||||
|
return [
|
||||||
|
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||||
|
'schedule' => $schedule->getAttribute('schedule'),
|
||||||
|
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||||
|
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||||
|
'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
$schedules = []; // Local copy of 'schedules' collection
|
||||||
|
$lastSyncUpdate = DateTime::now();
|
||||||
|
|
||||||
|
$limit = 10000;
|
||||||
|
$sum = $limit;
|
||||||
|
$total = 0;
|
||||||
|
$loadStart = \microtime(true);
|
||||||
|
$latestDocument = null;
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$paginationQueries = [Query::limit($limit)];
|
||||||
|
if ($latestDocument !== null) {
|
||||||
|
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||||
|
}
|
||||||
|
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||||
|
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||||
|
Query::equal('resourceType', ['function']),
|
||||||
|
Query::equal('active', [true]),
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sum = count($results);
|
||||||
|
$total = $total + $sum;
|
||||||
|
foreach ($results as $document) {
|
||||||
|
$schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
|
}
|
||||||
|
|
||||||
|
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
$pools->reclaim();
|
||||||
|
|
||||||
|
Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds");
|
||||||
|
|
||||||
|
Console::success("Starting timers at " . DateTime::now());
|
||||||
|
|
||||||
|
run(
|
||||||
|
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
|
||||||
|
/**
|
||||||
|
* The timer synchronize $schedules copy with database collection.
|
||||||
|
*/
|
||||||
|
Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
|
||||||
|
$time = DateTime::now();
|
||||||
|
$timerStart = \microtime(true);
|
||||||
|
|
||||||
|
$limit = 1000;
|
||||||
|
$sum = $limit;
|
||||||
|
$total = 0;
|
||||||
|
$latestDocument = null;
|
||||||
|
|
||||||
|
Console::log("Sync tick: Running at $time");
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$paginationQueries = [Query::limit($limit)];
|
||||||
|
if ($latestDocument !== null) {
|
||||||
|
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||||
|
}
|
||||||
|
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||||
|
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||||
|
Query::equal('resourceType', ['function']),
|
||||||
|
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sum = count($results);
|
||||||
|
$total = $total + $sum;
|
||||||
|
foreach ($results as $document) {
|
||||||
|
$localDocument = $schedules[$document['resourceId']] ?? null;
|
||||||
|
|
||||||
|
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
|
||||||
|
$new = strtotime($document['resourceUpdatedAt']);
|
||||||
|
|
||||||
|
if ($document['active'] === false) {
|
||||||
|
Console::info("Removing: {$document['resourceId']}");
|
||||||
|
unset($schedules[$document['resourceId']]);
|
||||||
|
} elseif ($new !== $org) {
|
||||||
|
Console::info("Updating: {$document['resourceId']}");
|
||||||
|
$schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
$lastSyncUpdate = $time;
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
|
||||||
|
$pools->reclaim();
|
||||||
|
|
||||||
|
Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds");
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The timer to prepare soon-to-execute schedules.
|
||||||
|
*/
|
||||||
|
$lastEnqueueUpdate = null;
|
||||||
|
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) {
|
||||||
|
$timerStart = \microtime(true);
|
||||||
|
$time = DateTime::now();
|
||||||
|
|
||||||
|
$enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
|
||||||
|
$timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
|
||||||
|
|
||||||
|
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
|
||||||
|
|
||||||
|
$total = 0;
|
||||||
|
|
||||||
|
$delayedExecutions = []; // Group executions with same delay to share one coroutine
|
||||||
|
|
||||||
|
foreach ($schedules as $key => $schedule) {
|
||||||
|
$cron = new CronExpression($schedule['schedule']);
|
||||||
|
$nextDate = $cron->getNextRunDate();
|
||||||
|
$next = DateTime::format($nextDate);
|
||||||
|
|
||||||
|
$currentTick = $next < $timeFrame;
|
||||||
|
|
||||||
|
if (!$currentTick) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$total++;
|
||||||
|
|
||||||
|
$promiseStart = \microtime(true); // in seconds
|
||||||
|
$executionStart = $nextDate->getTimestamp(); // in seconds
|
||||||
|
$executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
|
||||||
|
|
||||||
|
$delay = \ceil(\intval($executionSleep));
|
||||||
|
|
||||||
|
if (!isset($delayedExecutions[$delay])) {
|
||||||
|
$delayedExecutions[$delay] = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
$delayedExecutions[$delay][] = $key;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
||||||
|
\go(function () use ($delay, $schedules, $scheduleKeys) {
|
||||||
|
\sleep($delay); // in seconds
|
||||||
|
|
||||||
|
foreach ($scheduleKeys as $scheduleKey) {
|
||||||
|
// Ensure schedule was not deleted
|
||||||
|
if (!isset($schedules[$scheduleKey])) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
$lastEnqueueUpdate = $timerStart;
|
||||||
|
Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds");
|
||||||
|
};
|
||||||
|
|
||||||
|
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
|
||||||
|
$enqueueFunctions();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,231 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
global $cli;
|
|
||||||
global $register;
|
|
||||||
|
|
||||||
use Cron\CronExpression;
|
|
||||||
use Utopia\App;
|
|
||||||
use Utopia\CLI\Console;
|
|
||||||
use Utopia\Database\DateTime;
|
|
||||||
use Utopia\Database\Document;
|
|
||||||
use Utopia\Database\Query;
|
|
||||||
use Swoole\Timer;
|
|
||||||
|
|
||||||
const FUNCTION_UPDATE_TIMER = 60; //seconds
|
|
||||||
const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
|
||||||
const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min
|
|
||||||
|
|
||||||
sleep(4);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 1. first load from db with limit+offset
|
|
||||||
* 2. creating a 5-min offset array ($queue)
|
|
||||||
* 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta)
|
|
||||||
* if the function matches the current minute it should be dispatched to the functions worker.
|
|
||||||
* Then another translation is made to the cron pattern if it is in the next 5-min window
|
|
||||||
* it is assigned again to the $queue. .
|
|
||||||
* 4. Second timer runs every X min and updates the $functions (large) list.
|
|
||||||
* The query fetches only functions that [resourceUpdatedAt] attr changed from the
|
|
||||||
* last time the timer that was fired (X min)
|
|
||||||
* If the function was deleted it is unsets from the list ($functions) and the $queue.
|
|
||||||
* In the end of the timer the $queue is created again.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
$cli
|
|
||||||
->task('schedule')
|
|
||||||
->desc('Function scheduler task')
|
|
||||||
->action(function () use ($register) {
|
|
||||||
Console::title('Scheduler V1');
|
|
||||||
Console::success(APP_NAME . ' Scheduler v1 has started');
|
|
||||||
|
|
||||||
$dbForConsole = getConsoleDB();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
$createQueue = function () use (&$functions, &$queue): void {
|
|
||||||
$loadStart = \microtime(true);
|
|
||||||
/**
|
|
||||||
* Creating smaller functions list containing 5-min timeframe.
|
|
||||||
*/
|
|
||||||
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
|
|
||||||
foreach ($functions as $function) {
|
|
||||||
$cron = new CronExpression($function['schedule']);
|
|
||||||
$next = DateTime::format($cron->getNextRunDate());
|
|
||||||
|
|
||||||
if ($next < $timeFrame) {
|
|
||||||
$queue[$next][$function['resourceId']] = $function;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Console::success("Queue was built in " . (microtime(true) - $loadStart) . " seconds");
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param string $id
|
|
||||||
* @param string $resourceId
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
$removeFromQueue = function (string $resourceId) use (&$queue, &$functions, $dbForConsole) {
|
|
||||||
if (array_key_exists($resourceId, $functions)) {
|
|
||||||
unset($functions[$resourceId]);
|
|
||||||
Console::error("Removing :{$resourceId} from functions list");
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($queue as $slot => $schedule) {
|
|
||||||
if (array_key_exists($resourceId, $schedule)) {
|
|
||||||
unset($queue[$slot][$resourceId]);
|
|
||||||
Console::error("Removing :{$resourceId} from queue slot $slot");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param string $resourceId
|
|
||||||
* @param array $update
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
$updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void {
|
|
||||||
|
|
||||||
$functions[$resourceId] = $update;
|
|
||||||
Console::error("Updating :{$resourceId} in functions list");
|
|
||||||
|
|
||||||
foreach ($queue as $slot => $schedule) {
|
|
||||||
if (array_key_exists($resourceId, $schedule)) {
|
|
||||||
$queue[$slot][$resourceId] = $update;
|
|
||||||
Console::error("Updating :{$resourceId} in queue slot $slot");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @var Document $schedule
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
$getSchedule = function (Document $schedule) use ($dbForConsole): array {
|
|
||||||
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('schedule'));
|
|
||||||
|
|
||||||
return [
|
|
||||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
|
||||||
'schedule' => $schedule->getAttribute('schedule'),
|
|
||||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
|
||||||
'project' => $project,
|
|
||||||
//'function' => getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'))
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
$limit = 10000;
|
|
||||||
$sum = $limit;
|
|
||||||
$functions = [];
|
|
||||||
$queue = [];
|
|
||||||
$total = 0;
|
|
||||||
$loadStart = \microtime(true);
|
|
||||||
$latestDocument = null;
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
|
||||||
Query::equal('resourceType', ['function']),
|
|
||||||
Query::equal('active', [true]),
|
|
||||||
]));
|
|
||||||
|
|
||||||
$sum = count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $document) {
|
|
||||||
$functions[$document['resourceId']] = $getSchedule($document);
|
|
||||||
}
|
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds");
|
|
||||||
$createQueue();
|
|
||||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -600); // 10 min
|
|
||||||
|
|
||||||
Co\run(
|
|
||||||
function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
|
||||||
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
|
||||||
$time = DateTime::now();
|
|
||||||
$limit = 1000;
|
|
||||||
$sum = $limit;
|
|
||||||
$total = 0;
|
|
||||||
$latestDocument = null;
|
|
||||||
$timerStart = \microtime(true);
|
|
||||||
|
|
||||||
Console::warning("Update proc started at: $time last update was at $lastUpdate");
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
|
||||||
Query::equal('resourceType', ['function']),
|
|
||||||
Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate),
|
|
||||||
]));
|
|
||||||
|
|
||||||
$sum = count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $document) {
|
|
||||||
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
|
|
||||||
$new = strtotime($document['resourceUpdatedAt']);
|
|
||||||
if ($document['active'] === false) {
|
|
||||||
$removeFromQueue($document['resourceId']);
|
|
||||||
} elseif ($new > $org) {
|
|
||||||
$updateQueue($document['resourceId'], $getSchedule($document));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$lastUpdate = DateTime::now();
|
|
||||||
$createQueue();
|
|
||||||
Console::warning("Update timer: {$total} functions where updated in " . (microtime(true) - $timerStart) . " seconds");
|
|
||||||
});
|
|
||||||
|
|
||||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
|
||||||
$timerStart = \microtime(true);
|
|
||||||
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
|
|
||||||
$slot = (new \DateTime())->format('Y-m-d H:i:00.000');
|
|
||||||
|
|
||||||
Console::info("Enqueue proc started at: " . DateTime::now());
|
|
||||||
|
|
||||||
$count = 0;
|
|
||||||
if (array_key_exists($slot, $queue)) {
|
|
||||||
$schedule = $queue[$slot];
|
|
||||||
|
|
||||||
foreach ($schedule as $function) {
|
|
||||||
if (empty($functions[$function['resourceId']])) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$cron = new CronExpression($function['schedule']);
|
|
||||||
$next = DateTime::format($cron->getNextRunDate());
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If next schedule is in 5-min timeframe
|
|
||||||
* and it was not removed or changed, re-enqueue the function.
|
|
||||||
*/
|
|
||||||
if (
|
|
||||||
$next < $timeFrame &&
|
|
||||||
$function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule']
|
|
||||||
) {
|
|
||||||
$queue[$next][$function['resourceId']] = $function;
|
|
||||||
}
|
|
||||||
unset($queue[$slot][$function['resourceId']]); /** removing function from slot */
|
|
||||||
$count++;
|
|
||||||
}
|
|
||||||
unset($queue[$slot]); /** removing slot */
|
|
||||||
}
|
|
||||||
|
|
||||||
$timerEnd = \microtime(true);
|
|
||||||
Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds with {$count} functions");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
);
|
|
||||||
});
|
|
Loading…
Reference in a new issue