From 5e7de81966ba0a4d6548df3552a4a6aed4053452 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Mon, 14 Nov 2022 09:29:30 +0000 Subject: [PATCH 01/17] Improve schedule accuracy + simplify --- app/tasks/schedule.php | 218 ++++++++++++++-------------------------- app/workers/deletes.php | 5 + 2 files changed, 78 insertions(+), 145 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index ded7301208..33e030dbb7 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -11,98 +11,27 @@ use Utopia\Database\Document; use Utopia\Database\Query; use Swoole\Timer; -const FUNCTION_UPDATE_TIMER = 60; //seconds -const FUNCTION_ENQUEUE_TIMER = 60; //seconds -const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min -const FUNCTION_RESET_TIMER_TO = 50; // seconds - -sleep(4); +const FUNCTION_UPDATE_TIMER = 10; //seconds +const FUNCTION_ENQUEUE_TIMER = 10; //seconds /** - * 1. first load from db with limit+offset - * 2. creating a 5-min offset array ($queue) - * 3. First timer runs every minute, looping over $queue time slots (each slot is 1-min delta) - * if the function matches the current minute it should be dispatched to the functions worker. - * Then another translation is made to the cron pattern if it is in the next 5-min window - * it is assigned again to the $queue. . - * 4. Second timer runs every X min and updates the $functions (large) list. - * The query fetches only functions that [resourceUpdatedAt] attr changed from the - * last time the timer that was fired (X min) - * If the function was deleted it is unsets from the list ($functions) and the $queue. - * In the end of the timer the $queue is created again. - * + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. */ $cli ->task('schedule') ->desc('Function scheduler task') -->action(function () use ($register) { +->action(function () { Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); $dbForConsole = getConsoleDB(); /** - * @return void - */ - $createQueue = function () use (&$functions, &$queue): void { - $loadStart = \microtime(true); - /** - * Creating smaller functions list containing 5-min timeframe. - */ - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - foreach ($functions as $function) { - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - if ($next < $timeFrame) { - $queue[$next][$function['resourceId']] = $function; - } - } - $loadEnd = \microtime(true); - Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds"); - //var_dump($queue); - }; - - /** - * @param string $id - * @param string $resourceId - * @return void - */ - $removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) { - if (array_key_exists($resourceId, $functions)) { - unset($functions[$resourceId]); - $dbForConsole->deleteDocument('schedules', $id); - Console::error("Removing :{$resourceId} from functions list"); - } - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - unset($queue[$slot][$resourceId]); - Console::error("Removing :{$resourceId} from queue slot $slot"); - } - } - }; - - /** - * @param string $resourceId - * @param array $update - * @return void - */ - $updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void { - - $functions[$resourceId] = $update; - Console::error("Updating :{$resourceId} in functions list"); - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - $queue[$slot][$resourceId] = $update; - Console::error("Updating :{$resourceId} in queue slot $slot"); - } - } - }; - - /** + * Extract only nessessary attributes to lower memory used. + * * @var Document $schedule * @return array */ @@ -115,10 +44,11 @@ $cli ]; } + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + $limit = 10000; $sum = $limit; - $functions = []; - $queue = []; $total = 0; $loadStart = \microtime(true); $latestDocument = null; @@ -137,39 +67,33 @@ $cli $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $functions[$document['resourceId']] = getsSheduleAttributes($document); + $schedules[$document['resourceId']] = getsSheduleAttributes($document); } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } $loadEnd = \microtime(true); - Console::success("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds"); - $createQueue(); - $lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER); - - do { - $second = time() % 60; - } while ($second < FUNCTION_RESET_TIMER_TO); + Console::success("{$total} schedules where loaded in " . ($loadEnd - $loadStart) . " seconds"); $time = DateTime::now(); - Console::success("Starting timers at {$time}"); + Console::success("Starting timers at {$time}"); - - /** - * The timer updates $functions from db on last resourceUpdatedAt attr in X-min. - */ Co\run( - function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { - Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate) { $time = DateTime::now(); + $timerStart = \microtime(true); + $limit = 1000; $sum = $limit; $total = 0; $latestDocument = null; - $timerStart = \microtime(true); - //Console::warning("Update proc started at: $time last update was at $lastUpdate"); + Console::log("Sync tick: Running at $time"); while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; @@ -179,82 +103,86 @@ $cli $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), ])); $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); - if ($document['active'] === false) { - //Console::warning("Removing: {$document['resourceId']}"); - $removeFromQueue($document->getId(), $document['resourceId']); - } elseif ($new > $org) { - //Console::warning("Updating: {$document['resourceId']}"); - $updateQueue($document['resourceId'], getsSheduleAttributes($document)); + + if ($$document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = getsSheduleAttributes($document); } } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - $lastUpdate = DateTime::now(); - $createQueue(); + $lastSyncUpdate = $time; $timerEnd = \microtime(true); - //Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); }); /** - * The timer sends to worker every 1 min and re-enqueue matched functions. + * The timer to prepare soon-to-execute schedules. */ - Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { $timerStart = \microtime(true); $time = DateTime::now(); - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - $slot = (new \DateTime())->format('Y-m-d H:i:00.000'); - $prepareStart = time(); - Console::info("Enqueue proc started at: $time"); + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - if (array_key_exists($slot, $queue)) { - $schedule = $queue[$slot]; - console::info(count($schedule) . " functions sent to worker for time slot " . $slot); - $totalPreparation = time() - $prepareStart; + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - $wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation); - Console::info("Waiting for : {$wait} seconds"); - sleep($wait); + $total = 0; - $time = DateTime::now(); - Console::info("Start enqueueing at {$time}"); + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); - foreach ($schedule as $function) { - if (empty($functions[$function['resourceId']])) { - continue; - } + $currentTick = $next < $timeFrame; - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - /** - * If next schedule is in 5-min timeframe - * and it was not removed or changed, re-enqueue the function. - */ - if ( - $next < $timeFrame && - $function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule'] - ) { - $queue[$next][$function['resourceId']] = $function; - } - unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ + if(!$currentTick) { + continue; } - unset($queue[$slot]); /** removing slot */ + + $total++; + + $promiseStart = \microtime(true); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + \go(function() use ($executionSleep, $key, $schedules) { + \usleep($executionSleep * 1000000); // in microseconds + + // Ensure schedule was not deleted + if(!isset($schedules[$key])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + }); } + $timerEnd = \microtime(true); - Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds"); - }); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); } ); }); diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 9ef593dbb9..1980c6dc72 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -418,6 +418,11 @@ class DeletesV1 extends Worker $dbForProject = $this->getProjectDB($project); $functionId = $document->getId(); + /** + * Delete Schedule + */ + // TODO: DeleteDocument schedules collection + /** * Delete Variables */ From c2e8fc5733ecdf43365e692c200e757de338563f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Mon, 14 Nov 2022 10:00:46 +0000 Subject: [PATCH 02/17] Increase timer delay --- app/tasks/schedule.php | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index 89899b5d67..6d742681b5 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -12,7 +12,7 @@ use Utopia\Database\Query; use Swoole\Timer; const FUNCTION_UPDATE_TIMER = 10; //seconds -const FUNCTION_ENQUEUE_TIMER = 10; //seconds +const FUNCTION_ENQUEUE_TIMER = 60; //seconds /** * 1. Load all documents from 'schedules' collection to create local copy @@ -76,11 +76,9 @@ $cli $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - $loadEnd = \microtime(true); - Console::success("{$total} schedules where loaded in " . ($loadEnd - $loadStart) . " seconds"); + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); - $time = DateTime::now(); - Console::success("Starting timers at {$time}"); + Console::success("Starting timers at " . DateTime::now()); Co\run( function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { From de8122c02fb5ddd0f98b9c262452c2e94b57d214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Mon, 14 Nov 2022 13:02:16 +0000 Subject: [PATCH 03/17] Share coroutines for executions --- app/tasks/schedule.php | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index 6d742681b5..4734191987 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -147,6 +147,8 @@ $cli $total = 0; + $delayedExecutions = []; // Group executions with same delay to share one coroutine + foreach ($schedules as $key => $schedule) { $cron = new CronExpression($schedule['schedule']); $nextDate = $cron->getNextRunDate(); @@ -164,15 +166,27 @@ $cli $executionStart = $nextDate->getTimestamp(); // in seconds $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - \go(function() use ($executionSleep, $key, $schedules) { - \usleep($executionSleep * 1000000); // in microseconds + $delay = \intval($executionSleep); + + if(!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } - // Ensure schedule was not deleted - if(!isset($schedules[$key])) { - return; + $delayedExecutions[$delay][] = $key; + } + + foreach($delayedExecutions as $delay => $scheduleKeys) { + \go(function() use ($delay, $schedules, $scheduleKeys) { + \sleep($delay); // in seconds + + foreach($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if(!isset($schedules[$scheduleKey])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue } - - Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue }); } From ff06e4b2385439209fbfd83d163187c0e5408697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Tue, 15 Nov 2022 10:15:57 +0000 Subject: [PATCH 04/17] Move schedule delete TODO --- app/tasks/maintenance.php | 2 ++ app/workers/deletes.php | 5 ----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/app/tasks/maintenance.php b/app/tasks/maintenance.php index 1c16ca6911..c61c3dda3c 100644 --- a/app/tasks/maintenance.php +++ b/app/tasks/maintenance.php @@ -126,5 +126,7 @@ $cli notifyDeleteExpiredSessions(); renewCertificates($database); notifyDeleteCache($cacheRetention); + + // TODO: @Meldiron Every probably 24h, look for schedules with active=false, that doesnt have function anymore. Dlete such schedule }, $interval); }); diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 1980c6dc72..9ef593dbb9 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -418,11 +418,6 @@ class DeletesV1 extends Worker $dbForProject = $this->getProjectDB($project); $functionId = $document->getId(); - /** - * Delete Schedule - */ - // TODO: DeleteDocument schedules collection - /** * Delete Variables */ From 22effdd88adb1feefd68418af325267fc6b0fb2a Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 11:37:07 +0100 Subject: [PATCH 05/17] Refactor schedule task to new syntax --- src/Appwrite/Platform/Services/Tasks.php | 2 + src/Appwrite/Platform/Tasks/schedule.php | 386 ++++++++++++----------- 2 files changed, 202 insertions(+), 186 deletions(-) diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 7f6a062ed4..2968a66b95 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -7,6 +7,7 @@ use Appwrite\Platform\Tasks\Doctor; use Appwrite\Platform\Tasks\Install; use Appwrite\Platform\Tasks\Maintenance; use Appwrite\Platform\Tasks\Migrate; +use Appwrite\Platform\Tasks\Schedule; use Appwrite\Platform\Tasks\SDKs; use Appwrite\Platform\Tasks\Specs; use Appwrite\Platform\Tasks\SSL; @@ -28,6 +29,7 @@ class Tasks extends Service ->addAction(Doctor::getName(), new Doctor()) ->addAction(Install::getName(), new Install()) ->addAction(Maintenance::getName(), new Maintenance()) + ->addAction(Schedule::getName(), new Schedule()) ->addAction(Migrate::getName(), new Migrate()) ->addAction(SDKs::getName(), new SDKs()) ->addAction(VolumeSync::getName(), new VolumeSync()) diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index 4734191987..e070206405 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -1,202 +1,216 @@ task('schedule') -->desc('Function scheduler task') -->action(function () { - Console::title('Scheduler V1'); - Console::success(APP_NAME . ' Scheduler v1 has started'); - - $dbForConsole = getConsoleDB(); - - /** - * Extract only nessessary attributes to lower memory used. - * - * @var Document $schedule - * @return array - */ - $getSchedule = function (Document $schedule) use ($dbForConsole): array { - $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - $function = getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); - - return [ - 'resourceId' => $schedule->getAttribute('resourceId'), - 'schedule' => $schedule->getAttribute('schedule'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, - 'function' => $function, - ]; - }; - - $schedules = []; // Local copy of 'schedules' collection - $lastSyncUpdate = DateTime::now(); - - $limit = 10000; - $sum = $limit; - $total = 0; - $loadStart = \microtime(true); - $latestDocument = null; - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::equal('active', [true]), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $schedules[$document['resourceId']] = $getSchedule($document); - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + public static function getName(): string + { + return 'schedule'; } - Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); + public function __construct() + { + $this + ->desc('Execute functions scheduled in Appwrite') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn (Database $dbForConsole, callable $getProjectDB) => $this->action($dbForConsole, $getProjectDB)); + } - Console::success("Starting timers at " . DateTime::now()); - - Co\run( - function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { - /** - * The timer synchronize $schedules copy with database collection. - */ - Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { - $time = DateTime::now(); - $timerStart = \microtime(true); - - $limit = 1000; - $sum = $limit; - $total = 0; - $latestDocument = null; - - Console::log("Sync tick: Running at $time"); - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $localDocument = $schedules[$document['resourceId']] ?? null; - - $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; - $new = strtotime($document['resourceUpdatedAt']); - - if ($document['active'] === false) { - Console::info("Removing: {$document['resourceId']}"); - unset($schedules[$document['resourceId']]); - } elseif ($new !== $org) { - Console::info("Updating: {$document['resourceId']}"); - $schedules[$document['resourceId']] = $getSchedule($document); - } - } - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $lastSyncUpdate = $time; - $timerEnd = \microtime(true); - - Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); - }); - - /** - * The timer to prepare soon-to-execute schedules. - */ - $lastEnqueueUpdate = null; - $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { - $timerStart = \microtime(true); - $time = DateTime::now(); - - $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - - Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - - $total = 0; - - $delayedExecutions = []; // Group executions with same delay to share one coroutine - - foreach ($schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); - $nextDate = $cron->getNextRunDate(); - $next = DateTime::format($nextDate); - - $currentTick = $next < $timeFrame; - - if(!$currentTick) { - continue; - } - - $total++; - - $promiseStart = \microtime(true); // in seconds - $executionStart = $nextDate->getTimestamp(); // in seconds - $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - - $delay = \intval($executionSleep); - - if(!isset($delayedExecutions[$delay])) { - $delayedExecutions[$delay] = []; - } - - $delayedExecutions[$delay][] = $key; - } - - foreach($delayedExecutions as $delay => $scheduleKeys) { - \go(function() use ($delay, $schedules, $scheduleKeys) { - \sleep($delay); // in seconds - - foreach($scheduleKeys as $scheduleKey) { - // Ensure schedule was not deleted - if(!isset($schedules[$scheduleKey])) { - return; - } - - Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue - } - }); - } - - $timerEnd = \microtime(true); - $lastEnqueueUpdate = $timerStart; - Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); - }; - - Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); - $enqueueFunctions(); + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. + */ + public function action(Database $dbForConsole, callable $getProjectDB): void + { + Console::title('Scheduler V1'); + Console::success(APP_NAME . ' Scheduler v1 has started'); + + /** + * Extract only nessessary attributes to lower memory used. + * + * @var Document $schedule + * @return array + */ + $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); + + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + 'project' => $project, + 'function' => $function, + ]; + }; + + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + + $limit = 10000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::equal('active', [true]), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $schedules[$document['resourceId']] = $getSchedule($document); + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - ); -}); + + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + Co\run( + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; + $new = strtotime($document['resourceUpdatedAt']); + + if ($document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = $getSchedule($document); + } + } + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); + }); + + /** + * The timer to prepare soon-to-execute schedules. + */ + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { + $timerStart = \microtime(true); + $time = DateTime::now(); + + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); + + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); + + $total = 0; + + $delayedExecutions = []; // Group executions with same delay to share one coroutine + + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); + + $currentTick = $next < $timeFrame; + + if(!$currentTick) { + continue; + } + + $total++; + + $promiseStart = \microtime(true); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + $delay = \intval($executionSleep); + + if(!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } + + $delayedExecutions[$delay][] = $key; + } + + foreach($delayedExecutions as $delay => $scheduleKeys) { + \go(function() use ($delay, $schedules, $scheduleKeys) { + \sleep($delay); // in seconds + + foreach($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if(!isset($schedules[$scheduleKey])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + } + }); + } + + $timerEnd = \microtime(true); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); + } + ); + } +} From b031e13997005c437004b1ade7e65fe0df006fcc Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 11:39:24 +0100 Subject: [PATCH 06/17] Improve reclaim of pools in schedule --- app/cli.php | 10 +++++----- src/Appwrite/Platform/Tasks/schedule.php | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/app/cli.php b/app/cli.php index b176326191..3546674a13 100644 --- a/app/cli.php +++ b/app/cli.php @@ -61,16 +61,16 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, return $dbForConsole; } - $dbAdapter = $pools + $connection = $pools ->get($project->getAttribute('database')) - ->pop() - ->getResource() - ; + ->pop(); + + $dbAdapter = $connection->getResource(); $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); - return $database; + return [ $database, fn() => $connection->claim() ]; }; return $getProjectDB; diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index e070206405..0d88de1d05 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -49,7 +49,10 @@ class Schedule extends Action */ $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); + + [ $database, $reclaim ] = $getProjectDB($project); + $function = $database->getDocument('functions', $schedule->getAttribute('resourceId')); + $reclaim(); return [ 'resourceId' => $schedule->getAttribute('resourceId'), From c13589c1ea801d5e2ee92b2eb9d7421a14567d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Tue, 15 Nov 2022 11:04:13 +0000 Subject: [PATCH 07/17] Bug fix --- src/Appwrite/Platform/Tasks/schedule.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index 0d88de1d05..ca1de4164e 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -12,6 +12,8 @@ use Utopia\Database\Query; use Swoole\Timer; use Utopia\Database\Database; +use function Swoole\Coroutine\run; + class Schedule extends Action { const FUNCTION_UPDATE_TIMER = 10; //seconds @@ -96,7 +98,7 @@ class Schedule extends Action Console::success("Starting timers at " . DateTime::now()); - Co\run( + run( function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { /** * The timer synchronize $schedules copy with database collection. From 1084631d0f8e5b31aef5381773779c8a986d5aff Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 12:23:08 +0100 Subject: [PATCH 08/17] Fux reclaim logic in CLI; Prevent early executions --- app/cli.php | 2 +- src/Appwrite/Platform/Tasks/schedule.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/cli.php b/app/cli.php index 3546674a13..c84adef11c 100644 --- a/app/cli.php +++ b/app/cli.php @@ -70,7 +70,7 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); - return [ $database, fn() => $connection->claim() ]; + return [ $database, fn() => $connection->reclaim() ]; }; return $getProjectDB; diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index ca1de4164e..798353ef57 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -184,7 +184,7 @@ class Schedule extends Action $executionStart = $nextDate->getTimestamp(); // in seconds $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - $delay = \intval($executionSleep); + $delay = \ceil(\intval($executionSleep)); if(!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; From 9539e2790783a4a8ec26bb25a205a83bbba04edc Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 12:32:36 +0100 Subject: [PATCH 09/17] Linter fix --- src/Appwrite/Platform/Tasks/schedule.php | 90 ++++++++++++------------ 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index 798353ef57..f9106e47c8 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -16,8 +16,8 @@ use function Swoole\Coroutine\run; class Schedule extends Action { - const FUNCTION_UPDATE_TIMER = 10; //seconds - const FUNCTION_ENQUEUE_TIMER = 60; //seconds + public const FUNCTION_UPDATE_TIMER = 10; //seconds + public const FUNCTION_ENQUEUE_TIMER = 60; //seconds public static function getName(): string { @@ -42,10 +42,10 @@ class Schedule extends Action { Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); - + /** * Extract only nessessary attributes to lower memory used. - * + * * @var Document $schedule * @return array */ @@ -55,7 +55,7 @@ class Schedule extends Action [ $database, $reclaim ] = $getProjectDB($project); $function = $database->getDocument('functions', $schedule->getAttribute('resourceId')); $reclaim(); - + return [ 'resourceId' => $schedule->getAttribute('resourceId'), 'schedule' => $schedule->getAttribute('schedule'), @@ -64,16 +64,16 @@ class Schedule extends Action 'function' => $function, ]; }; - + $schedules = []; // Local copy of 'schedules' collection $lastSyncUpdate = DateTime::now(); - + $limit = 10000; $sum = $limit; $total = 0; $loadStart = \microtime(true); $latestDocument = null; - + while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; if ($latestDocument !== null) { @@ -84,20 +84,20 @@ class Schedule extends Action Query::equal('resourceType', ['function']), Query::equal('active', [true]), ])); - + $sum = count($results); $total = $total + $sum; foreach ($results as $document) { $schedules[$document['resourceId']] = $getSchedule($document); } - + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); - + Console::success("Starting timers at " . DateTime::now()); - + run( function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { /** @@ -106,14 +106,14 @@ class Schedule extends Action Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { $time = DateTime::now(); $timerStart = \microtime(true); - + $limit = 1000; $sum = $limit; $total = 0; $latestDocument = null; - + Console::log("Sync tick: Running at $time"); - + while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; if ($latestDocument !== null) { @@ -124,15 +124,15 @@ class Schedule extends Action Query::equal('resourceType', ['function']), Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), ])); - + $sum = count($results); $total = $total + $sum; foreach ($results as $document) { $localDocument = $schedules[$document['resourceId']] ?? null; - + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); - + if ($document['active'] === false) { Console::info("Removing: {$document['resourceId']}"); unset($schedules[$document['resourceId']]); @@ -143,13 +143,13 @@ class Schedule extends Action } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - + $lastSyncUpdate = $time; $timerEnd = \microtime(true); - + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); }); - + /** * The timer to prepare soon-to-execute schedules. */ @@ -157,62 +157,62 @@ class Schedule extends Action $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { $timerStart = \microtime(true); $time = DateTime::now(); - + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - + $total = 0; - + $delayedExecutions = []; // Group executions with same delay to share one coroutine - + foreach ($schedules as $key => $schedule) { $cron = new CronExpression($schedule['schedule']); $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate); - + $currentTick = $next < $timeFrame; - - if(!$currentTick) { + + if (!$currentTick) { continue; } - + $total++; - + $promiseStart = \microtime(true); // in seconds $executionStart = $nextDate->getTimestamp(); // in seconds $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - + $delay = \ceil(\intval($executionSleep)); - - if(!isset($delayedExecutions[$delay])) { + + if (!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; } - + $delayedExecutions[$delay][] = $key; } - - foreach($delayedExecutions as $delay => $scheduleKeys) { - \go(function() use ($delay, $schedules, $scheduleKeys) { + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys) { \sleep($delay); // in seconds - - foreach($scheduleKeys as $scheduleKey) { + + foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted - if(!isset($schedules[$scheduleKey])) { + if (!isset($schedules[$scheduleKey])) { return; } - + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue } }); } - + $timerEnd = \microtime(true); $lastEnqueueUpdate = $timerStart; Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); }; - + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); $enqueueFunctions(); } From 727338cb9ac8a375cd5434886395ec8369c0ef66 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 13:54:54 +0100 Subject: [PATCH 10/17] Imrpove pools relciam logic --- app/cli.php | 9 ++++----- src/Appwrite/Platform/Tasks/schedule.php | 18 +++++++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/app/cli.php b/app/cli.php index c84adef11c..0160738f68 100644 --- a/app/cli.php +++ b/app/cli.php @@ -61,16 +61,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, return $dbForConsole; } - $connection = $pools + $dbAdapter = $pools ->get($project->getAttribute('database')) - ->pop(); - - $dbAdapter = $connection->getResource(); + ->pop() + ->getResource(); $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); - return [ $database, fn() => $connection->reclaim() ]; + return $database; }; return $getProjectDB; diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index f9106e47c8..36c28be141 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -11,6 +11,7 @@ use Utopia\Database\Document; use Utopia\Database\Query; use Swoole\Timer; use Utopia\Database\Database; +use Utopia\Pools\Group; use function Swoole\Coroutine\run; @@ -28,9 +29,10 @@ class Schedule extends Action { $this ->desc('Execute functions scheduled in Appwrite') + ->inject('pools') ->inject('dbForConsole') ->inject('getProjectDB') - ->callback(fn (Database $dbForConsole, callable $getProjectDB) => $this->action($dbForConsole, $getProjectDB)); + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); } /** @@ -38,7 +40,7 @@ class Schedule extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. */ - public function action(Database $dbForConsole, callable $getProjectDB): void + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void { Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); @@ -52,9 +54,7 @@ class Schedule extends Action $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - [ $database, $reclaim ] = $getProjectDB($project); - $function = $database->getDocument('functions', $schedule->getAttribute('resourceId')); - $reclaim(); + $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); return [ 'resourceId' => $schedule->getAttribute('resourceId'), @@ -93,17 +93,19 @@ class Schedule extends Action $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } + + $pools->reclaim(); Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); run( - function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule) { + Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -147,6 +149,8 @@ class Schedule extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); + $pools->reclaim(); + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); }); From ea5bd5160a2fda5339204328c5a1ed8b4fd5bea6 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:01:06 +0100 Subject: [PATCH 11/17] Linter fix --- src/Appwrite/Platform/Tasks/schedule.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php index 36c28be141..2598f9cd62 100644 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ b/src/Appwrite/Platform/Tasks/schedule.php @@ -77,7 +77,7 @@ class Schedule extends Action while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); + $paginationQueries[] = Query::cursorAfter($latestDocument); } $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), @@ -93,7 +93,7 @@ class Schedule extends Action $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } - + $pools->reclaim(); Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); From bb3c99c58bd7d806a7dd645e852605420a4f8cc2 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:01:24 +0100 Subject: [PATCH 12/17] Rename file step 1 --- src/Appwrite/Platform/Tasks/schedule.php | 225 ----------------------- 1 file changed, 225 deletions(-) delete mode 100644 src/Appwrite/Platform/Tasks/schedule.php diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php deleted file mode 100644 index 2598f9cd62..0000000000 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ /dev/null @@ -1,225 +0,0 @@ -desc('Execute functions scheduled in Appwrite') - ->inject('pools') - ->inject('dbForConsole') - ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); - } - - /** - * 1. Load all documents from 'schedules' collection to create local copy - * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute - * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. - */ - public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void - { - Console::title('Scheduler V1'); - Console::success(APP_NAME . ' Scheduler v1 has started'); - - /** - * Extract only nessessary attributes to lower memory used. - * - * @var Document $schedule - * @return array - */ - $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { - $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); - - $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); - - return [ - 'resourceId' => $schedule->getAttribute('resourceId'), - 'schedule' => $schedule->getAttribute('schedule'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, - 'function' => $function, - ]; - }; - - $schedules = []; // Local copy of 'schedules' collection - $lastSyncUpdate = DateTime::now(); - - $limit = 10000; - $sum = $limit; - $total = 0; - $loadStart = \microtime(true); - $latestDocument = null; - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::equal('active', [true]), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $schedules[$document['resourceId']] = $getSchedule($document); - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $pools->reclaim(); - - Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); - - Console::success("Starting timers at " . DateTime::now()); - - run( - function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { - /** - * The timer synchronize $schedules copy with database collection. - */ - Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { - $time = DateTime::now(); - $timerStart = \microtime(true); - - $limit = 1000; - $sum = $limit; - $total = 0; - $latestDocument = null; - - Console::log("Sync tick: Running at $time"); - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $localDocument = $schedules[$document['resourceId']] ?? null; - - $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; - $new = strtotime($document['resourceUpdatedAt']); - - if ($document['active'] === false) { - Console::info("Removing: {$document['resourceId']}"); - unset($schedules[$document['resourceId']]); - } elseif ($new !== $org) { - Console::info("Updating: {$document['resourceId']}"); - $schedules[$document['resourceId']] = $getSchedule($document); - } - } - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $lastSyncUpdate = $time; - $timerEnd = \microtime(true); - - $pools->reclaim(); - - Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); - }); - - /** - * The timer to prepare soon-to-execute schedules. - */ - $lastEnqueueUpdate = null; - $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { - $timerStart = \microtime(true); - $time = DateTime::now(); - - $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; - $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); - - Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); - - $total = 0; - - $delayedExecutions = []; // Group executions with same delay to share one coroutine - - foreach ($schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); - $nextDate = $cron->getNextRunDate(); - $next = DateTime::format($nextDate); - - $currentTick = $next < $timeFrame; - - if (!$currentTick) { - continue; - } - - $total++; - - $promiseStart = \microtime(true); // in seconds - $executionStart = $nextDate->getTimestamp(); // in seconds - $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued - - $delay = \ceil(\intval($executionSleep)); - - if (!isset($delayedExecutions[$delay])) { - $delayedExecutions[$delay] = []; - } - - $delayedExecutions[$delay][] = $key; - } - - foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $schedules, $scheduleKeys) { - \sleep($delay); // in seconds - - foreach ($scheduleKeys as $scheduleKey) { - // Ensure schedule was not deleted - if (!isset($schedules[$scheduleKey])) { - return; - } - - Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue - } - }); - } - - $timerEnd = \microtime(true); - $lastEnqueueUpdate = $timerStart; - Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); - }; - - Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); - $enqueueFunctions(); - } - ); - } -} From 451c6334bb2acc664913498c3c83cee357b67ec6 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:01:44 +0100 Subject: [PATCH 13/17] Rename file step 2 --- src/Appwrite/Platform/Tasks/Schedule.php | 225 +++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 src/Appwrite/Platform/Tasks/Schedule.php diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php new file mode 100644 index 0000000000..2598f9cd62 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -0,0 +1,225 @@ +desc('Execute functions scheduled in Appwrite') + ->inject('pools') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + } + + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. + */ + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + { + Console::title('Scheduler V1'); + Console::success(APP_NAME . ' Scheduler v1 has started'); + + /** + * Extract only nessessary attributes to lower memory used. + * + * @var Document $schedule + * @return array + */ + $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + + $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); + + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + 'project' => $project, + 'function' => $function, + ]; + }; + + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + + $limit = 10000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::equal('active', [true]), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $schedules[$document['resourceId']] = $getSchedule($document); + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $pools->reclaim(); + + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + run( + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; + $new = strtotime($document['resourceUpdatedAt']); + + if ($document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = $getSchedule($document); + } + } + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + $pools->reclaim(); + + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); + }); + + /** + * The timer to prepare soon-to-execute schedules. + */ + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { + $timerStart = \microtime(true); + $time = DateTime::now(); + + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); + + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); + + $total = 0; + + $delayedExecutions = []; // Group executions with same delay to share one coroutine + + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); + + $currentTick = $next < $timeFrame; + + if (!$currentTick) { + continue; + } + + $total++; + + $promiseStart = \microtime(true); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + $delay = \ceil(\intval($executionSleep)); + + if (!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } + + $delayedExecutions[$delay][] = $key; + } + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys) { + \sleep($delay); // in seconds + + foreach ($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if (!isset($schedules[$scheduleKey])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + } + }); + } + + $timerEnd = \microtime(true); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); + } + ); + } +} From bfe8b9800862a1526bbb217348db19dbe0a9e7cd Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:30:19 +0100 Subject: [PATCH 14/17] Attempt to reuse db connections --- app/cli.php | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/app/cli.php b/app/cli.php index 0160738f68..9c77278c7e 100644 --- a/app/cli.php +++ b/app/cli.php @@ -56,19 +56,29 @@ CLI::setResource('dbForConsole', function ($pools, $cache) { }, ['pools', 'cache']); CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache) { + $databases = []; + + $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } + $databaseName = $project->getAttribute('database'); + + if(isset($databases[$databaseName])) { + return $databases[$databaseName]; + } + $dbAdapter = $pools - ->get($project->getAttribute('database')) + ->get($databaseName) ->pop() ->getResource(); $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); + $databases[$databaseName] = $database; + return $database; }; From b6e4ec822bfb554bb8e910a3de35e4018610a102 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:34:13 +0100 Subject: [PATCH 15/17] Add TODO --- app/cli.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/cli.php b/app/cli.php index 9c77278c7e..04e4d9e4ff 100644 --- a/app/cli.php +++ b/app/cli.php @@ -56,7 +56,7 @@ CLI::setResource('dbForConsole', function ($pools, $cache) { }, ['pools', 'cache']); CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { - $databases = []; + $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { From c07953a640ac2aaa7f0fe12eb0440ed936f93366 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:35:13 +0100 Subject: [PATCH 16/17] Linter fix --- app/cli.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/cli.php b/app/cli.php index 04e4d9e4ff..502ee77b75 100644 --- a/app/cli.php +++ b/app/cli.php @@ -65,7 +65,7 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $databaseName = $project->getAttribute('database'); - if(isset($databases[$databaseName])) { + if (isset($databases[$databaseName])) { return $databases[$databaseName]; } From e04295918f27a7be337cf787aab0f7d969d7f880 Mon Sep 17 00:00:00 2001 From: Matej Baco Date: Tue, 15 Nov 2022 14:36:43 +0100 Subject: [PATCH 17/17] Add TODO --- src/Appwrite/Platform/Tasks/Schedule.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php index 2598f9cd62..0e48d23883 100644 --- a/src/Appwrite/Platform/Tasks/Schedule.php +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -60,8 +60,8 @@ class Schedule extends Action 'resourceId' => $schedule->getAttribute('resourceId'), 'schedule' => $schedule->getAttribute('schedule'), 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, - 'function' => $function, + 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + 'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here ]; };