1
0
Fork 0
mirror of synced 2024-06-17 18:24:51 +12:00

Merge pull request #841 from appwrite/feat-function-schedule

Feat function schedule
This commit is contained in:
Eldad A. Fux 2021-01-27 18:46:19 +02:00 committed by GitHub
commit a8ff09e846
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 38 deletions

View file

@ -44,6 +44,9 @@ App::post('/v1/functions')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($name, $execute, $env, $vars, $events, $schedule, $timeout, $response, $projectDB) { ->action(function ($name, $execute, $env, $vars, $events, $schedule, $timeout, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->createDocument([ $function = $projectDB->createDocument([
'$collection' => Database::SYSTEM_COLLECTION_FUNCTIONS, '$collection' => Database::SYSTEM_COLLECTION_FUNCTIONS,
'$permissions' => [ '$permissions' => [
@ -91,6 +94,9 @@ App::get('/v1/functions')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($search, $limit, $offset, $orderType, $response, $projectDB) { ->action(function ($search, $limit, $offset, $orderType, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$results = $projectDB->getCollection([ $results = $projectDB->getCollection([
'limit' => $limit, 'limit' => $limit,
'offset' => $offset, 'offset' => $offset,
@ -122,6 +128,9 @@ App::get('/v1/functions/:functionId')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $response, $projectDB) { ->action(function ($functionId, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -272,13 +281,19 @@ App::put('/v1/functions/:functionId')
->param('timeout', 15, new Range(1, 900), 'Function maximum execution time in seconds.', true) ->param('timeout', 15, new Range(1, 900), 'Function maximum execution time in seconds.', true)
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB) { ->inject('project')
->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB, $project) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
/** @var Appwrite\Database\Document $project */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
throw new Exception('Function not found', 404); throw new Exception('Function not found', 404);
} }
$original = $function->getAttribute('schedule', '');
$cron = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? CronExpression::factory($schedule) : null; $cron = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? CronExpression::factory($schedule) : null;
$next = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null; $next = (!empty($function->getAttribute('tag', null)) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null;
@ -291,28 +306,23 @@ App::put('/v1/functions/:functionId')
'vars' => $vars, 'vars' => $vars,
'events' => $events, 'events' => $events,
'schedule' => $schedule, 'schedule' => $schedule,
'schedulePrevious' => null,
'scheduleNext' => $next, 'scheduleNext' => $next,
'timeout' => $timeout, 'timeout' => $timeout,
])); ]));
if ($next) {
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
]);
// ->setParam('projectId', $project->getId())
// ->setParam('event', $route->getLabel('event', ''))
// ->setParam('payload', [])
// ->setParam('functionId', null)
// ->setParam('executionId', null)
// ->setParam('trigger', 'event')
}
if (false === $function) { if (false === $function) {
throw new Exception('Failed saving function to DB', 500); throw new Exception('Failed saving function to DB', 500);
} }
if ($next && $schedule !== $original) {
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
'projectId' => $project->getId(),
'functionId' => $function->getId(),
'executionId' => null,
'trigger' => 'schedule',
]); // Async task rescheduale
}
$response->dynamic($function, Response::MODEL_FUNCTION); $response->dynamic($function, Response::MODEL_FUNCTION);
}); });
@ -331,7 +341,12 @@ App::patch('/v1/functions/:functionId/tag')
->param('tag', '', new UID(), 'Tag unique ID.') ->param('tag', '', new UID(), 'Tag unique ID.')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $tag, $response, $projectDB) { ->inject('project')
->action(function ($functionId, $tag, $response, $projectDB, $project) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
/** @var Appwrite\Database\Document $project */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
$tag = $projectDB->getDocument($tag); $tag = $projectDB->getDocument($tag);
@ -344,14 +359,23 @@ App::patch('/v1/functions/:functionId/tag')
} }
$schedule = $function->getAttribute('schedule', ''); $schedule = $function->getAttribute('schedule', '');
$cron = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? CronExpression::factory($schedule) : null; $cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? CronExpression::factory($schedule) : null;
$next = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null; $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : null;
$function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [ $function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [
'tag' => $tag->getId(), 'tag' => $tag->getId(),
'scheduleNext' => $next, 'scheduleNext' => $next,
])); ]));
if ($next) { // Init first schedule
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
'projectId' => $project->getId(),
'functionId' => $function->getId(),
'executionId' => null,
'trigger' => 'schedule',
]); // Async task rescheduale
}
if (false === $function) { if (false === $function) {
throw new Exception('Failed saving function to DB', 500); throw new Exception('Failed saving function to DB', 500);
} }
@ -418,6 +442,11 @@ App::post('/v1/functions/:functionId/tags')
->inject('projectDB') ->inject('projectDB')
->inject('usage') ->inject('usage')
->action(function ($functionId, $command, $code, $request, $response, $projectDB, $usage) { ->action(function ($functionId, $command, $code, $request, $response, $projectDB, $usage) {
/** @var Utopia\Swoole\Request $request */
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
/** @var Appwrite\Event\Event $usage */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -506,6 +535,9 @@ App::get('/v1/functions/:functionId/tags')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -545,6 +577,9 @@ App::get('/v1/functions/:functionId/tags/:tagId')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $tagId, $response, $projectDB) { ->action(function ($functionId, $tagId, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -581,6 +616,10 @@ App::delete('/v1/functions/:functionId/tags/:tagId')
->inject('projectDB') ->inject('projectDB')
->inject('usage') ->inject('usage')
->action(function ($functionId, $tagId, $response, $projectDB, $usage) { ->action(function ($functionId, $tagId, $response, $projectDB, $usage) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
/** @var Appwrite\Event\Event $usage */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -727,6 +766,9 @@ App::get('/v1/functions/:functionId/executions')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) { ->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
@ -766,6 +808,9 @@ App::get('/v1/functions/:functionId/executions/:executionId')
->inject('response') ->inject('response')
->inject('projectDB') ->inject('projectDB')
->action(function ($functionId, $executionId, $response, $projectDB) { ->action(function ($functionId, $executionId, $response, $projectDB) {
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $projectDB */
$function = $projectDB->getDocument($functionId); $function = $projectDB->getDocument($functionId);
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) { if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {

View file

@ -6,6 +6,7 @@ use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter; use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Database\Validator\Authorization; use Appwrite\Database\Validator\Authorization;
use Appwrite\Event\Event; use Appwrite\Event\Event;
use Cron\CronExpression;
use Swoole\Runtime; use Swoole\Runtime;
use Utopia\App; use Utopia\App;
use Utopia\CLI\Console; use Utopia\CLI\Console;
@ -27,7 +28,7 @@ $environments = Config::getParam('environments');
$warmupStart = \microtime(true); $warmupStart = \microtime(true);
Co\run(function() use ($environments) { // Warmup: make sure images are ready to run fast 🚀 Co\run(function() use ($environments) { // Warmup: make sure images are ready to run fast 🚀
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL); Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
foreach($environments as $environment) { foreach($environments as $environment) {
go(function() use ($environment) { go(function() use ($environment) {
@ -79,14 +80,6 @@ $stdout = \explode("\n", $stdout);
\parse_str($value, $container); \parse_str($value, $container);
if(isset($container['name'])) { if(isset($container['name'])) {
// $labels = [];
// $temp = explode(',', $container['labels'] ?? []);
// foreach($temp as &$label) {
// $label = explode('=', $label);
// $labels[$label[0] || 0] = $label[1] || '';
// }
$container = [ $container = [
'name' => $container['name'], 'name' => $container['name'],
'online' => (\substr($container['status'], 0, 2) === 'Up'), 'online' => (\substr($container['status'], 0, 2) === 'Up'),
@ -142,6 +135,7 @@ class FunctionsV1
$executionId = $this->args['executionId'] ?? ''; $executionId = $this->args['executionId'] ?? '';
$trigger = $this->args['trigger'] ?? ''; $trigger = $this->args['trigger'] ?? '';
$event = $this->args['event'] ?? ''; $event = $this->args['event'] ?? '';
$scheduleOriginal = $this->args['scheduleOriginal'] ?? '';
$payload = (!empty($this->args['payload'])) ? json_encode($this->args['payload']) : ''; $payload = (!empty($this->args['payload'])) ? json_encode($this->args['payload']) : '';
$database = new Database(); $database = new Database();
@ -190,9 +184,7 @@ class FunctionsV1
Console::success('Triggered function: '.$event); Console::success('Triggered function: '.$event);
Swoole\Coroutine\run(function () use ($projectId, $database, $function, $event, $payload) { $this->execute('event', $projectId, '', $database, $function, $event, $payload);
$this->execute('event', $projectId, '', $database, $function, $event, $payload);
});
} }
} }
break; break;
@ -210,6 +202,45 @@ class FunctionsV1
* On failure add error count * On failure add error count
* If error count bigger than allowed change status to pause * If error count bigger than allowed change status to pause
*/ */
// Reschedule
Authorization::disable();
$function = $database->getDocument($functionId);
Authorization::reset();
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
throw new Exception('Function not found ('.$functionId.')');
}
if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
return;
}
$cron = CronExpression::factory($function->getAttribute('schedule'));
$next = (int) $cron->getNextRunDate()->format('U');
$function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', \time())
;
Authorization::disable();
$function = $database->updateDocument(array_merge($function->getArrayCopy(), [
'scheduleNext' => $next,
]));
Authorization::reset();
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
'projectId' => $projectId,
'functionId' => $function->getId(),
'executionId' => null,
'trigger' => 'schedule',
'scheduleOriginal' => $function->getAttribute('schedule', ''),
]); // Async task rescheduale
$this->execute($trigger, $projectId, $executionId, $database, $function);
break; break;
@ -222,9 +253,7 @@ class FunctionsV1
throw new Exception('Function not found ('.$functionId.')'); throw new Exception('Function not found ('.$functionId.')');
} }
Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) { $this->execute($trigger, $projectId, $executionId, $database, $function);
$this->execute($trigger, $projectId, $executionId, $database, $function);
});
break; break;
default: default:
@ -278,8 +307,8 @@ class FunctionsV1
'time' => 0, 'time' => 0,
]); ]);
if(false === $execution) { if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) {
throw new Exception('Failed to create execution'); throw new Exception('Failed to create or read execution');
} }
Authorization::reset(); Authorization::reset();

View file

@ -219,7 +219,7 @@ class Database
/** /**
* @param array $data * @param array $data
* *
* @return Document|bool * @return Document
* *
* @throws AuthorizationException * @throws AuthorizationException
* @throws StructureException * @throws StructureException

View file

@ -537,7 +537,7 @@ class FunctionsCustomServerTest extends Scope
], ],
]; ];
sleep(count($envs) * 25); sleep(count($envs) * 20);
/** /**
* Test for SUCCESS * Test for SUCCESS