diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 733a20ddd..3f25755d1 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -44,6 +44,9 @@ App::post('/v1/functions') ->inject('response') ->inject('projectDB') ->action(function ($name, $execute, $env, $vars, $events, $schedule, $timeout, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->createDocument([ '$collection' => Database::SYSTEM_COLLECTION_FUNCTIONS, '$permissions' => [ @@ -91,6 +94,9 @@ App::get('/v1/functions') ->inject('response') ->inject('projectDB') ->action(function ($search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $results = $projectDB->getCollection([ 'limit' => $limit, 'offset' => $offset, @@ -122,6 +128,9 @@ App::get('/v1/functions/:functionId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -272,13 +281,19 @@ App::put('/v1/functions/:functionId') ->param('timeout', 15, new Range(1, 900), 'Function maximum execution time in seconds.', true) ->inject('response') ->inject('projectDB') - ->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB) { + ->inject('project') + ->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB, $project) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Database\Document $project */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { throw new Exception('Function not found', 404); } + $original = $function->getAttribute('schedule', ''); $cron = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? CronExpression::factory($schedule) : null; $next = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; @@ -291,28 +306,23 @@ App::put('/v1/functions/:functionId') 'vars' => $vars, 'events' => $events, 'schedule' => $schedule, - 'schedulePrevious' => null, 'scheduleNext' => $next, - 'timeout' => $timeout, + 'timeout' => $timeout, ])); - if ($next) { - ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ - - ]); - - // ->setParam('projectId', $project->getId()) - // ->setParam('event', $route->getLabel('event', '')) - // ->setParam('payload', []) - // ->setParam('functionId', null) - // ->setParam('executionId', null) - // ->setParam('trigger', 'event') - } - if (false === $function) { throw new Exception('Failed saving function to DB', 500); } + if ($next && $schedule !== $original) { + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $project->getId(), + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + ]); // Async task rescheduale + } + $response->dynamic($function, Response::MODEL_FUNCTION); }); @@ -331,7 +341,12 @@ App::patch('/v1/functions/:functionId/tag') ->param('tag', '', new UID(), 'Tag unique ID.') ->inject('response') ->inject('projectDB') - ->action(function ($functionId, $tag, $response, $projectDB) { + ->inject('project') + ->action(function ($functionId, $tag, $response, $projectDB, $project) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Database\Document $project */ + $function = $projectDB->getDocument($functionId); $tag = $projectDB->getDocument($tag); @@ -344,14 +359,23 @@ App::patch('/v1/functions/:functionId/tag') } $schedule = $function->getAttribute('schedule', ''); - $cron = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? CronExpression::factory($schedule) : null; - $next = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null; + $cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? CronExpression::factory($schedule) : null; + $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; $function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [ 'tag' => $tag->getId(), 'scheduleNext' => $next, ])); + if ($next) { // Init first schedule + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $project->getId(), + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + ]); // Async task rescheduale + } + if (false === $function) { throw new Exception('Failed saving function to DB', 500); } @@ -418,6 +442,11 @@ App::post('/v1/functions/:functionId/tags') ->inject('projectDB') ->inject('usage') ->action(function ($functionId, $command, $code, $request, $response, $projectDB, $usage) { + /** @var Utopia\Swoole\Request $request */ + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Event\Event $usage */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -506,6 +535,9 @@ App::get('/v1/functions/:functionId/tags') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -545,6 +577,9 @@ App::get('/v1/functions/:functionId/tags/:tagId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $tagId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -581,6 +616,10 @@ App::delete('/v1/functions/:functionId/tags/:tagId') ->inject('projectDB') ->inject('usage') ->action(function ($functionId, $tagId, $response, $projectDB, $usage) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Event\Event $usage */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -727,6 +766,9 @@ App::get('/v1/functions/:functionId/executions') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -766,6 +808,9 @@ App::get('/v1/functions/:functionId/executions/:executionId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $executionId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { diff --git a/app/workers/functions.php b/app/workers/functions.php index cc276ed52..c3a40f081 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -6,6 +6,7 @@ use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; +use Cron\CronExpression; use Swoole\Runtime; use Utopia\App; use Utopia\CLI\Console; @@ -27,7 +28,7 @@ $environments = Config::getParam('environments'); $warmupStart = \microtime(true); Co\run(function() use ($environments) { // Warmup: make sure images are ready to run fast 🚀 - Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); + Runtime::enableCoroutine(SWOOLE_HOOK_ALL); foreach($environments as $environment) { go(function() use ($environment) { @@ -79,14 +80,6 @@ $stdout = \explode("\n", $stdout); \parse_str($value, $container); if(isset($container['name'])) { - // $labels = []; - // $temp = explode(',', $container['labels'] ?? []); - - // foreach($temp as &$label) { - // $label = explode('=', $label); - // $labels[$label[0] || 0] = $label[1] || ''; - // } - $container = [ 'name' => $container['name'], 'online' => (\substr($container['status'], 0, 2) === 'Up'), @@ -142,6 +135,7 @@ class FunctionsV1 $executionId = $this->args['executionId'] ?? ''; $trigger = $this->args['trigger'] ?? ''; $event = $this->args['event'] ?? ''; + $scheduleOriginal = $this->args['scheduleOriginal'] ?? ''; $payload = (!empty($this->args['payload'])) ? json_encode($this->args['payload']) : ''; $database = new Database(); @@ -190,9 +184,7 @@ class FunctionsV1 Console::success('Triggered function: '.$event); - Swoole\Coroutine\run(function () use ($projectId, $database, $function, $event, $payload) { - $this->execute('event', $projectId, '', $database, $function, $event, $payload); - }); + $this->execute('event', $projectId, '', $database, $function, $event, $payload); } } break; @@ -210,6 +202,45 @@ class FunctionsV1 * On failure add error count * If error count bigger than allowed change status to pause */ + + // Reschedule + Authorization::disable(); + $function = $database->getDocument($functionId); + Authorization::reset(); + + if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { + throw new Exception('Function not found ('.$functionId.')'); + } + + if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. + return; + } + + $cron = CronExpression::factory($function->getAttribute('schedule')); + $next = (int) $cron->getNextRunDate()->format('U'); + + $function + ->setAttribute('scheduleNext', $next) + ->setAttribute('schedulePrevious', \time()) + ; + + Authorization::disable(); + + $function = $database->updateDocument(array_merge($function->getArrayCopy(), [ + 'scheduleNext' => $next, + ])); + + Authorization::reset(); + + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $projectId, + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + 'scheduleOriginal' => $function->getAttribute('schedule', ''), + ]); // Async task rescheduale + + $this->execute($trigger, $projectId, $executionId, $database, $function); break; @@ -222,9 +253,7 @@ class FunctionsV1 throw new Exception('Function not found ('.$functionId.')'); } - Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) { - $this->execute($trigger, $projectId, $executionId, $database, $function); - }); + $this->execute($trigger, $projectId, $executionId, $database, $function); break; default: @@ -278,8 +307,8 @@ class FunctionsV1 'time' => 0, ]); - if(false === $execution) { - throw new Exception('Failed to create execution'); + if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) { + throw new Exception('Failed to create or read execution'); } Authorization::reset(); diff --git a/src/Appwrite/Database/Database.php b/src/Appwrite/Database/Database.php index c4e1370e1..3005a3eec 100644 --- a/src/Appwrite/Database/Database.php +++ b/src/Appwrite/Database/Database.php @@ -219,7 +219,7 @@ class Database /** * @param array $data * - * @return Document|bool + * @return Document * * @throws AuthorizationException * @throws StructureException diff --git a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php index ae62711c6..41a66b153 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php @@ -537,7 +537,7 @@ class FunctionsCustomServerTest extends Scope ], ]; - sleep(count($envs) * 25); + sleep(count($envs) * 20); /** * Test for SUCCESS