From 47f5e871baa07f2833db79ce103ece5075247313 Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Sun, 17 Jan 2021 01:38:13 +0200 Subject: [PATCH 1/8] Added scheduling --- app/controllers/api/functions.php | 82 ++++++++++++++++++++++++------- app/workers/functions.php | 49 ++++++++++++++---- 2 files changed, 103 insertions(+), 28 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 0572b8137..7a9996fbd 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -44,6 +44,9 @@ App::post('/v1/functions') ->inject('response') ->inject('projectDB') ->action(function ($name, $execute, $env, $vars, $events, $schedule, $timeout, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->createDocument([ '$collection' => Database::SYSTEM_COLLECTION_FUNCTIONS, '$permissions' => [ @@ -91,6 +94,9 @@ App::get('/v1/functions') ->inject('response') ->inject('projectDB') ->action(function ($search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $results = $projectDB->getCollection([ 'limit' => $limit, 'offset' => $offset, @@ -122,6 +128,9 @@ App::get('/v1/functions/:functionId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -272,7 +281,12 @@ App::put('/v1/functions/:functionId') ->param('timeout', 15, new Range(1, 900), 'Function maximum execution time in seconds.', true) ->inject('response') ->inject('projectDB') - ->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB) { + ->inject('project') + ->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB, $project) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Database\Document $project */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -291,28 +305,23 @@ App::put('/v1/functions/:functionId') 'vars' => $vars, 'events' => $events, 'schedule' => $schedule, - 'schedulePrevious' => null, 'scheduleNext' => $next, - 'timeout' => $timeout, + 'timeout' => $timeout, ])); - if ($next) { - ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ - - ]); - - // ->setParam('projectId', $project->getId()) - // ->setParam('event', $route->getLabel('event', '')) - // ->setParam('payload', []) - // ->setParam('functionId', null) - // ->setParam('executionId', null) - // ->setParam('trigger', 'event') - } - if (false === $function) { throw new Exception('Failed saving function to DB', 500); } + if ($next) { + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $project->getId(), + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + ]); // Async task rescheduale + } + $response->dynamic($function, Response::MODEL_FUNCTION); }); @@ -331,7 +340,12 @@ App::patch('/v1/functions/:functionId/tag') ->param('tag', '', new UID(), 'Tag unique ID.') ->inject('response') ->inject('projectDB') - ->action(function ($functionId, $tag, $response, $projectDB) { + ->inject('project') + ->action(function ($functionId, $tag, $response, $projectDB, $project) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Database\Document $project */ + $function = $projectDB->getDocument($functionId); $tag = $projectDB->getDocument($tag); @@ -344,14 +358,23 @@ App::patch('/v1/functions/:functionId/tag') } $schedule = $function->getAttribute('schedule', ''); - $cron = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? CronExpression::factory($schedule) : null; - $next = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null; + $cron = (empty($function->getAttribute('tag') && !empty($schedule))) ? CronExpression::factory($schedule) : null; + $next = (empty($function->getAttribute('tag') && !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null; $function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [ 'tag' => $tag->getId(), 'scheduleNext' => $next, ])); + if ($next) { // Init first schedule + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $project->getId(), + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + ]); // Async task rescheduale + } + if (false === $function) { throw new Exception('Failed saving function to DB', 500); } @@ -418,6 +441,11 @@ App::post('/v1/functions/:functionId/tags') ->inject('projectDB') ->inject('usage') ->action(function ($functionId, $command, $code, $request, $response, $projectDB, $usage) { + /** @var Utopia\Swoole\Request $request */ + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Event\Event $usage */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -506,6 +534,9 @@ App::get('/v1/functions/:functionId/tags') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -545,6 +576,9 @@ App::get('/v1/functions/:functionId/tags/:tagId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $tagId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -581,6 +615,10 @@ App::delete('/v1/functions/:functionId/tags/:tagId') ->inject('projectDB') ->inject('usage') ->action(function ($functionId, $tagId, $response, $projectDB, $usage) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + /** @var Appwrite\Event\Event $usage */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -727,6 +765,9 @@ App::get('/v1/functions/:functionId/executions') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { @@ -766,6 +807,9 @@ App::get('/v1/functions/:functionId/executions/:executionId') ->inject('response') ->inject('projectDB') ->action(function ($functionId, $executionId, $response, $projectDB) { + /** @var Appwrite\Utopia\Response $response */ + /** @var Appwrite\Database\Database $projectDB */ + $function = $projectDB->getDocument($functionId); if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { diff --git a/app/workers/functions.php b/app/workers/functions.php index 2712dc74f..222b76a60 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -6,6 +6,7 @@ use Appwrite\Database\Adapter\MySQL as MySQLAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Validator\Authorization; use Appwrite\Event\Event; +use Cron\CronExpression; use Swoole\Runtime; use Utopia\App; use Utopia\CLI\Console; @@ -27,7 +28,7 @@ $environments = Config::getParam('environments'); $warmupStart = \microtime(true); Co\run(function() use ($environments) { // Warmup: make sure images are ready to run fast 🚀 - Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); + Runtime::enableCoroutine(SWOOLE_HOOK_ALL); foreach($environments as $environment) { go(function() use ($environment) { @@ -79,14 +80,6 @@ $stdout = \explode("\n", $stdout); \parse_str($value, $container); if(isset($container['name'])) { - // $labels = []; - // $temp = explode(',', $container['labels'] ?? []); - - // foreach($temp as &$label) { - // $label = explode('=', $label); - // $labels[$label[0] || 0] = $label[1] || ''; - // } - $container = [ 'name' => $container['name'], 'online' => (\substr($container['status'], 0, 2) === 'Up'), @@ -142,6 +135,7 @@ class FunctionsV1 $executionId = $this->args['executionId'] ?? ''; $trigger = $this->args['trigger'] ?? ''; $event = $this->args['event'] ?? ''; + $scheduleOriginal = $this->args['scheduleOriginal'] ?? ''; $payload = (!empty($this->args['payload'])) ? json_encode($this->args['payload']) : ''; $database = new Database(); @@ -210,6 +204,43 @@ class FunctionsV1 * On failure add error count * If error count bigger than allowed change status to pause */ + + // Reschedule + Authorization::disable(); + $function = $database->getDocument($functionId); + Authorization::reset(); + + if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { + throw new Exception('Function not found ('.$functionId.')'); + } + + if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. + return; + } + + $cron = CronExpression::factory($function->getAttribute('schedule')); + $next = (int) $cron->getNextRunDate()->format('U'); + + $function + ->setAttribute('scheduleNext', $next) + ->setAttribute('schedulePrevious', \time()) + ; + + $function = $database->updateDocument(array_merge($function->getArrayCopy(), [ + 'scheduleNext' => $next, + ])); + + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ + 'projectId' => $projectId, + 'functionId' => $function->getId(), + 'executionId' => null, + 'trigger' => 'schedule', + 'scheduleOriginal' => $function->getAttribute('schedule', ''), + ]); // Async task rescheduale + + Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) { + $this->execute($trigger, $projectId, $executionId, $database, $function); + }); break; From f16113dbb3a6dc1b99d3a1566283b81042f2e401 Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Sun, 17 Jan 2021 02:07:43 +0200 Subject: [PATCH 2/8] Some minor fixes --- app/controllers/api/functions.php | 8 +++++--- app/workers/functions.php | 4 ++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 7a9996fbd..f8f70796c 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -293,6 +293,7 @@ App::put('/v1/functions/:functionId') throw new Exception('Function not found', 404); } + $original = $function->getAttribute('schedule', ''); $cron = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? CronExpression::factory($schedule) : null; $next = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; @@ -313,7 +314,7 @@ App::put('/v1/functions/:functionId') throw new Exception('Failed saving function to DB', 500); } - if ($next) { + if ($next && $schedule !== $original) { ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ 'projectId' => $project->getId(), 'functionId' => $function->getId(), @@ -358,8 +359,9 @@ App::patch('/v1/functions/:functionId/tag') } $schedule = $function->getAttribute('schedule', ''); - $cron = (empty($function->getAttribute('tag') && !empty($schedule))) ? CronExpression::factory($schedule) : null; - $next = (empty($function->getAttribute('tag') && !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null; + var_dump($schedule); + $cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? CronExpression::factory($schedule) : null; + $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; $function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [ 'tag' => $tag->getId(), diff --git a/app/workers/functions.php b/app/workers/functions.php index 222b76a60..6c3ba252b 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -226,10 +226,14 @@ class FunctionsV1 ->setAttribute('schedulePrevious', \time()) ; + Authorization::disable(); + $function = $database->updateDocument(array_merge($function->getArrayCopy(), [ 'scheduleNext' => $next, ])); + Authorization::reset(); + ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [ 'projectId' => $projectId, 'functionId' => $function->getId(), From 26faaae5fce52ea74d269cdc0352b8a3a8e70d6e Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Sun, 17 Jan 2021 02:08:16 +0200 Subject: [PATCH 3/8] Removed log --- app/controllers/api/functions.php | 1 - 1 file changed, 1 deletion(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index f8f70796c..56bf9bfb2 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -359,7 +359,6 @@ App::patch('/v1/functions/:functionId/tag') } $schedule = $function->getAttribute('schedule', ''); - var_dump($schedule); $cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? CronExpression::factory($schedule) : null; $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; From d908606509c31362fd1652865788910651f7b304 Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Wed, 20 Jan 2021 06:49:00 +0200 Subject: [PATCH 4/8] Minor patch --- app/workers/functions.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/workers/functions.php b/app/workers/functions.php index 6c3ba252b..82b27754b 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -313,8 +313,8 @@ class FunctionsV1 'time' => 0, ]); - if(false === $execution) { - throw new Exception('Failed to create execution'); + if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) { + throw new Exception('Failed to create or read execution'); } Authorization::reset(); From fdd413a4b399afd6560ff8fea79f46a574a9f717 Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Tue, 26 Jan 2021 21:16:03 +0200 Subject: [PATCH 5/8] Removed not working co-runs --- app/workers/functions.php | 12 +++--------- src/Appwrite/Database/Database.php | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/app/workers/functions.php b/app/workers/functions.php index 82b27754b..3265a1c51 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -184,9 +184,7 @@ class FunctionsV1 Console::success('Triggered function: '.$event); - Swoole\Coroutine\run(function () use ($projectId, $database, $function, $event, $payload) { - $this->execute('event', $projectId, '', $database, $function, $event, $payload); - }); + $this->execute('event', $projectId, '', $database, $function, $event, $payload); } } break; @@ -242,9 +240,7 @@ class FunctionsV1 'scheduleOriginal' => $function->getAttribute('schedule', ''), ]); // Async task rescheduale - Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) { - $this->execute($trigger, $projectId, $executionId, $database, $function); - }); + $this->execute($trigger, $projectId, $executionId, $database, $function); break; @@ -257,9 +253,7 @@ class FunctionsV1 throw new Exception('Function not found ('.$functionId.')'); } - Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) { - $this->execute($trigger, $projectId, $executionId, $database, $function); - }); + $this->execute($trigger, $projectId, $executionId, $database, $function); break; default: diff --git a/src/Appwrite/Database/Database.php b/src/Appwrite/Database/Database.php index 8610aeac1..c2297fd39 100644 --- a/src/Appwrite/Database/Database.php +++ b/src/Appwrite/Database/Database.php @@ -214,7 +214,7 @@ class Database /** * @param array $data * - * @return Document|bool + * @return Document * * @throws AuthorizationException * @throws StructureException From 211ecf5d81130d585adc1bd2366cec1622e5c846 Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Tue, 26 Jan 2021 22:37:03 +0200 Subject: [PATCH 6/8] Test new sleep --- tests/e2e/Services/Functions/FunctionsCustomServerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php index ae62711c6..ffcfcc2ee 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php @@ -537,7 +537,7 @@ class FunctionsCustomServerTest extends Scope ], ]; - sleep(count($envs) * 25); + sleep(count($envs) * 5); /** * Test for SUCCESS From 22b1a6aff3a210d13f756fb1a1d45184c31f7abd Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Tue, 26 Jan 2021 23:38:15 +0200 Subject: [PATCH 7/8] Updated sleep time 2 --- tests/e2e/Services/Functions/FunctionsCustomServerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php index ffcfcc2ee..da3566b8f 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php @@ -537,7 +537,7 @@ class FunctionsCustomServerTest extends Scope ], ]; - sleep(count($envs) * 5); + sleep(count($envs) * 10); /** * Test for SUCCESS From d250b472e3ad41c8fd7034e2c296669d001c6e4b Mon Sep 17 00:00:00 2001 From: Eldad Fux Date: Wed, 27 Jan 2021 00:50:11 +0200 Subject: [PATCH 8/8] Updated speed safty factor 3 --- tests/e2e/Services/Functions/FunctionsCustomServerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php index da3566b8f..41a66b153 100644 --- a/tests/e2e/Services/Functions/FunctionsCustomServerTest.php +++ b/tests/e2e/Services/Functions/FunctionsCustomServerTest.php @@ -537,7 +537,7 @@ class FunctionsCustomServerTest extends Scope ], ]; - sleep(count($envs) * 10); + sleep(count($envs) * 20); /** * Test for SUCCESS