diff --git a/app/cli.php b/app/cli.php index b176326191..502ee77b75 100644 --- a/app/cli.php +++ b/app/cli.php @@ -56,20 +56,29 @@ CLI::setResource('dbForConsole', function ($pools, $cache) { }, ['pools', 'cache']); CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache) { + $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + + $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } + $databaseName = $project->getAttribute('database'); + + if (isset($databases[$databaseName])) { + return $databases[$databaseName]; + } + $dbAdapter = $pools - ->get($project->getAttribute('database')) + ->get($databaseName) ->pop() - ->getResource() - ; + ->getResource(); $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); + $databases[$databaseName] = $database; + return $database; }; diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 7f6a062ed4..2968a66b95 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -7,6 +7,7 @@ use Appwrite\Platform\Tasks\Doctor; use Appwrite\Platform\Tasks\Install; use Appwrite\Platform\Tasks\Maintenance; use Appwrite\Platform\Tasks\Migrate; +use Appwrite\Platform\Tasks\Schedule; use Appwrite\Platform\Tasks\SDKs; use Appwrite\Platform\Tasks\Specs; use Appwrite\Platform\Tasks\SSL; @@ -28,6 +29,7 @@ class Tasks extends Service ->addAction(Doctor::getName(), new Doctor()) ->addAction(Install::getName(), new Install()) ->addAction(Maintenance::getName(), new Maintenance()) + ->addAction(Schedule::getName(), new Schedule()) ->addAction(Migrate::getName(), new Migrate()) ->addAction(SDKs::getName(), new SDKs()) ->addAction(VolumeSync::getName(), new VolumeSync()) diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index fe659c8746..307f502611 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -139,6 +139,8 @@ class Maintenance extends Action notifyDeleteExpiredSessions(); renewCertificates($dbForConsole); notifyDeleteCache($cacheRetention); + + // TODO: @Meldiron Every probably 24h, look for schedules with active=false, that doesnt have function anymore. Dlete such schedule }, $interval); } } diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php new file mode 100644 index 0000000000..0e48d23883 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -0,0 +1,225 @@ +desc('Execute functions scheduled in Appwrite') + ->inject('pools') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + } + + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. + */ + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + { + Console::title('Scheduler V1'); + Console::success(APP_NAME . ' Scheduler v1 has started'); + + /** + * Extract only nessessary attributes to lower memory used. + * + * @var Document $schedule + * @return array + */ + $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + + $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); + + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + 'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + ]; + }; + + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + + $limit = 10000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::equal('active', [true]), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $schedules[$document['resourceId']] = $getSchedule($document); + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $pools->reclaim(); + + Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + run( + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; + $new = strtotime($document['resourceUpdatedAt']); + + if ($document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = $getSchedule($document); + } + } + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + $pools->reclaim(); + + Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds"); + }); + + /** + * The timer to prepare soon-to-execute schedules. + */ + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { + $timerStart = \microtime(true); + $time = DateTime::now(); + + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); + + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); + + $total = 0; + + $delayedExecutions = []; // Group executions with same delay to share one coroutine + + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); + + $currentTick = $next < $timeFrame; + + if (!$currentTick) { + continue; + } + + $total++; + + $promiseStart = \microtime(true); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + $delay = \ceil(\intval($executionSleep)); + + if (!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } + + $delayedExecutions[$delay][] = $key; + } + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys) { + \sleep($delay); // in seconds + + foreach ($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if (!isset($schedules[$scheduleKey])) { + return; + } + + Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + } + }); + } + + $timerEnd = \microtime(true); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); + } + ); + } +} diff --git a/src/Appwrite/Platform/Tasks/schedule.php b/src/Appwrite/Platform/Tasks/schedule.php deleted file mode 100644 index 1b301def5b..0000000000 --- a/src/Appwrite/Platform/Tasks/schedule.php +++ /dev/null @@ -1,231 +0,0 @@ -task('schedule') -->desc('Function scheduler task') -->action(function () use ($register) { - Console::title('Scheduler V1'); - Console::success(APP_NAME . ' Scheduler v1 has started'); - - $dbForConsole = getConsoleDB(); - - /** - * @return void - */ - $createQueue = function () use (&$functions, &$queue): void { - $loadStart = \microtime(true); - /** - * Creating smaller functions list containing 5-min timeframe. - */ - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - foreach ($functions as $function) { - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - if ($next < $timeFrame) { - $queue[$next][$function['resourceId']] = $function; - } - } - - Console::success("Queue was built in " . (microtime(true) - $loadStart) . " seconds"); - }; - - /** - * @param string $id - * @param string $resourceId - * @return void - */ - $removeFromQueue = function (string $resourceId) use (&$queue, &$functions, $dbForConsole) { - if (array_key_exists($resourceId, $functions)) { - unset($functions[$resourceId]); - Console::error("Removing :{$resourceId} from functions list"); - } - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - unset($queue[$slot][$resourceId]); - Console::error("Removing :{$resourceId} from queue slot $slot"); - } - } - }; - - /** - * @param string $resourceId - * @param array $update - * @return void - */ - $updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void { - - $functions[$resourceId] = $update; - Console::error("Updating :{$resourceId} in functions list"); - - foreach ($queue as $slot => $schedule) { - if (array_key_exists($resourceId, $schedule)) { - $queue[$slot][$resourceId] = $update; - Console::error("Updating :{$resourceId} in queue slot $slot"); - } - } - }; - - /** - * @var Document $schedule - * @return array - */ - $getSchedule = function (Document $schedule) use ($dbForConsole): array { - $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('schedule')); - - return [ - 'resourceId' => $schedule->getAttribute('resourceId'), - 'schedule' => $schedule->getAttribute('schedule'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, - //'function' => getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')) - ]; - }; - - $limit = 10000; - $sum = $limit; - $functions = []; - $queue = []; - $total = 0; - $loadStart = \microtime(true); - $latestDocument = null; - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::equal('active', [true]), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $functions[$document['resourceId']] = $getSchedule($document); - } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds"); - $createQueue(); - $lastUpdate = DateTime::addSeconds(new \DateTime(), -600); // 10 min - - Co\run( - function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { - Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { - $time = DateTime::now(); - $limit = 1000; - $sum = $limit; - $total = 0; - $latestDocument = null; - $timerStart = \microtime(true); - - Console::warning("Update proc started at: $time last update was at $lastUpdate"); - - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ - Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('resourceType', ['function']), - Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate), - ])); - - $sum = count($results); - $total = $total + $sum; - foreach ($results as $document) { - $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; - $new = strtotime($document['resourceUpdatedAt']); - if ($document['active'] === false) { - $removeFromQueue($document['resourceId']); - } elseif ($new > $org) { - $updateQueue($document['resourceId'], $getSchedule($document)); - } - } - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; - } - - $lastUpdate = DateTime::now(); - $createQueue(); - Console::warning("Update timer: {$total} functions where updated in " . (microtime(true) - $timerStart) . " seconds"); - }); - - Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { - $timerStart = \microtime(true); - $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); - $slot = (new \DateTime())->format('Y-m-d H:i:00.000'); - - Console::info("Enqueue proc started at: " . DateTime::now()); - - $count = 0; - if (array_key_exists($slot, $queue)) { - $schedule = $queue[$slot]; - - foreach ($schedule as $function) { - if (empty($functions[$function['resourceId']])) { - continue; - } - - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); - - /** - * If next schedule is in 5-min timeframe - * and it was not removed or changed, re-enqueue the function. - */ - if ( - $next < $timeFrame && - $function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule'] - ) { - $queue[$next][$function['resourceId']] = $function; - } - unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ - $count++; - } - unset($queue[$slot]); /** removing slot */ - } - - $timerEnd = \microtime(true); - Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds with {$count} functions"); - }); - } - ); -});