Merge remote-tracking branch 'origin/refactor-scheduler' into refactor-scheduler-improvements
This commit is contained in:
commit
9626d86b22
2 changed files with 13 additions and 11 deletions
|
@ -19,7 +19,6 @@ const FUNCTION_ENQUEUE_TIMER = 10; //seconds
|
||||||
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
||||||
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
|
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
$cli
|
$cli
|
||||||
->task('schedule')
|
->task('schedule')
|
||||||
->desc('Function scheduler task')
|
->desc('Function scheduler task')
|
||||||
|
@ -35,14 +34,18 @@ $cli
|
||||||
* @var Document $schedule
|
* @var Document $schedule
|
||||||
* @return array
|
* @return array
|
||||||
*/
|
*/
|
||||||
function getsSheduleAttributes(Document $schedule): array
|
$getSchedule = function (Document $schedule) use ($dbForConsole): array {
|
||||||
{
|
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
|
||||||
|
$function = getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'));
|
||||||
|
|
||||||
return [
|
return [
|
||||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||||
'schedule' => $schedule->getAttribute('schedule'),
|
'schedule' => $schedule->getAttribute('schedule'),
|
||||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||||
|
'project' => $project,
|
||||||
|
'function' => $function,
|
||||||
];
|
];
|
||||||
}
|
};
|
||||||
|
|
||||||
$schedules = []; // Local copy of 'schedules' collection
|
$schedules = []; // Local copy of 'schedules' collection
|
||||||
$lastSyncUpdate = DateTime::now();
|
$lastSyncUpdate = DateTime::now();
|
||||||
|
@ -67,7 +70,7 @@ $cli
|
||||||
$sum = count($results);
|
$sum = count($results);
|
||||||
$total = $total + $sum;
|
$total = $total + $sum;
|
||||||
foreach ($results as $document) {
|
foreach ($results as $document) {
|
||||||
$schedules[$document['resourceId']] = getsSheduleAttributes($document);
|
$schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
}
|
}
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||||
|
@ -80,11 +83,11 @@ $cli
|
||||||
Console::success("Starting timers at {$time}");
|
Console::success("Starting timers at {$time}");
|
||||||
|
|
||||||
Co\run(
|
Co\run(
|
||||||
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) {
|
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) {
|
||||||
/**
|
/**
|
||||||
* The timer synchronize $schedules copy with database collection.
|
* The timer synchronize $schedules copy with database collection.
|
||||||
*/
|
*/
|
||||||
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) {
|
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) {
|
||||||
$time = DateTime::now();
|
$time = DateTime::now();
|
||||||
$timerStart = \microtime(true);
|
$timerStart = \microtime(true);
|
||||||
|
|
||||||
|
@ -114,15 +117,14 @@ $cli
|
||||||
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
|
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
|
||||||
$new = strtotime($document['resourceUpdatedAt']);
|
$new = strtotime($document['resourceUpdatedAt']);
|
||||||
|
|
||||||
if ($$document['active'] === false) {
|
if ($document['active'] === false) {
|
||||||
Console::info("Removing: {$document['resourceId']}");
|
Console::info("Removing: {$document['resourceId']}");
|
||||||
unset($schedules[$document['resourceId']]);
|
unset($schedules[$document['resourceId']]);
|
||||||
} elseif ($new !== $org) {
|
} elseif ($new !== $org) {
|
||||||
Console::info("Updating: {$document['resourceId']}");
|
Console::info("Updating: {$document['resourceId']}");
|
||||||
$schedules[$document['resourceId']] = getsSheduleAttributes($document);
|
$schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -764,7 +764,7 @@ services:
|
||||||
|
|
||||||
mariadb:
|
mariadb:
|
||||||
image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p
|
image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p
|
||||||
container_name: mariadb
|
container_name: appwrite-mariadb
|
||||||
<<: *x-logging
|
<<: *x-logging
|
||||||
networks:
|
networks:
|
||||||
- appwrite
|
- appwrite
|
||||||
|
|
Loading…
Reference in a new issue