Added scheduling
This commit is contained in:
parent
1da4fa8168
commit
47f5e871ba
2 changed files with 103 additions and 28 deletions
|
@ -44,6 +44,9 @@ App::post('/v1/functions')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($name, $execute, $env, $vars, $events, $schedule, $timeout, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->createDocument([
|
||||
'$collection' => Database::SYSTEM_COLLECTION_FUNCTIONS,
|
||||
'$permissions' => [
|
||||
|
@ -91,6 +94,9 @@ App::get('/v1/functions')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($search, $limit, $offset, $orderType, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$results = $projectDB->getCollection([
|
||||
'limit' => $limit,
|
||||
'offset' => $offset,
|
||||
|
@ -122,6 +128,9 @@ App::get('/v1/functions/:functionId')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -272,7 +281,12 @@ App::put('/v1/functions/:functionId')
|
|||
->param('timeout', 15, new Range(1, 900), 'Function maximum execution time in seconds.', true)
|
||||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB) {
|
||||
->inject('project')
|
||||
->action(function ($functionId, $name, $execute, $vars, $events, $schedule, $timeout, $response, $projectDB, $project) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
/** @var Appwrite\Database\Document $project */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -291,28 +305,23 @@ App::put('/v1/functions/:functionId')
|
|||
'vars' => $vars,
|
||||
'events' => $events,
|
||||
'schedule' => $schedule,
|
||||
'schedulePrevious' => null,
|
||||
'scheduleNext' => $next,
|
||||
'timeout' => $timeout,
|
||||
]));
|
||||
|
||||
if ($next) {
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
|
||||
]);
|
||||
|
||||
// ->setParam('projectId', $project->getId())
|
||||
// ->setParam('event', $route->getLabel('event', ''))
|
||||
// ->setParam('payload', [])
|
||||
// ->setParam('functionId', null)
|
||||
// ->setParam('executionId', null)
|
||||
// ->setParam('trigger', 'event')
|
||||
}
|
||||
|
||||
if (false === $function) {
|
||||
throw new Exception('Failed saving function to DB', 500);
|
||||
}
|
||||
|
||||
if ($next) {
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
'projectId' => $project->getId(),
|
||||
'functionId' => $function->getId(),
|
||||
'executionId' => null,
|
||||
'trigger' => 'schedule',
|
||||
]); // Async task rescheduale
|
||||
}
|
||||
|
||||
$response->dynamic($function, Response::MODEL_FUNCTION);
|
||||
});
|
||||
|
||||
|
@ -331,7 +340,12 @@ App::patch('/v1/functions/:functionId/tag')
|
|||
->param('tag', '', new UID(), 'Tag unique ID.')
|
||||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $tag, $response, $projectDB) {
|
||||
->inject('project')
|
||||
->action(function ($functionId, $tag, $response, $projectDB, $project) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
/** @var Appwrite\Database\Document $project */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
$tag = $projectDB->getDocument($tag);
|
||||
|
||||
|
@ -344,14 +358,23 @@ App::patch('/v1/functions/:functionId/tag')
|
|||
}
|
||||
|
||||
$schedule = $function->getAttribute('schedule', '');
|
||||
$cron = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? CronExpression::factory($schedule) : null;
|
||||
$next = (!empty($function->getAttribute('tag')&& !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null;
|
||||
$cron = (empty($function->getAttribute('tag') && !empty($schedule))) ? CronExpression::factory($schedule) : null;
|
||||
$next = (empty($function->getAttribute('tag') && !empty($schedule))) ? $cron->getNextRunDate()->format('U') : null;
|
||||
|
||||
$function = $projectDB->updateDocument(array_merge($function->getArrayCopy(), [
|
||||
'tag' => $tag->getId(),
|
||||
'scheduleNext' => $next,
|
||||
]));
|
||||
|
||||
if ($next) { // Init first schedule
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
'projectId' => $project->getId(),
|
||||
'functionId' => $function->getId(),
|
||||
'executionId' => null,
|
||||
'trigger' => 'schedule',
|
||||
]); // Async task rescheduale
|
||||
}
|
||||
|
||||
if (false === $function) {
|
||||
throw new Exception('Failed saving function to DB', 500);
|
||||
}
|
||||
|
@ -418,6 +441,11 @@ App::post('/v1/functions/:functionId/tags')
|
|||
->inject('projectDB')
|
||||
->inject('usage')
|
||||
->action(function ($functionId, $command, $code, $request, $response, $projectDB, $usage) {
|
||||
/** @var Utopia\Swoole\Request $request */
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
/** @var Appwrite\Event\Event $usage */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -506,6 +534,9 @@ App::get('/v1/functions/:functionId/tags')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -545,6 +576,9 @@ App::get('/v1/functions/:functionId/tags/:tagId')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $tagId, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -581,6 +615,10 @@ App::delete('/v1/functions/:functionId/tags/:tagId')
|
|||
->inject('projectDB')
|
||||
->inject('usage')
|
||||
->action(function ($functionId, $tagId, $response, $projectDB, $usage) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
/** @var Appwrite\Event\Event $usage */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -727,6 +765,9 @@ App::get('/v1/functions/:functionId/executions')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $search, $limit, $offset, $orderType, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
@ -766,6 +807,9 @@ App::get('/v1/functions/:functionId/executions/:executionId')
|
|||
->inject('response')
|
||||
->inject('projectDB')
|
||||
->action(function ($functionId, $executionId, $response, $projectDB) {
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
|
||||
$function = $projectDB->getDocument($functionId);
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
|
|
|
@ -6,6 +6,7 @@ use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
|||
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
||||
use Appwrite\Database\Validator\Authorization;
|
||||
use Appwrite\Event\Event;
|
||||
use Cron\CronExpression;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
|
@ -27,7 +28,7 @@ $environments = Config::getParam('environments');
|
|||
$warmupStart = \microtime(true);
|
||||
|
||||
Co\run(function() use ($environments) { // Warmup: make sure images are ready to run fast 🚀
|
||||
Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
|
||||
foreach($environments as $environment) {
|
||||
go(function() use ($environment) {
|
||||
|
@ -79,14 +80,6 @@ $stdout = \explode("\n", $stdout);
|
|||
\parse_str($value, $container);
|
||||
|
||||
if(isset($container['name'])) {
|
||||
// $labels = [];
|
||||
// $temp = explode(',', $container['labels'] ?? []);
|
||||
|
||||
// foreach($temp as &$label) {
|
||||
// $label = explode('=', $label);
|
||||
// $labels[$label[0] || 0] = $label[1] || '';
|
||||
// }
|
||||
|
||||
$container = [
|
||||
'name' => $container['name'],
|
||||
'online' => (\substr($container['status'], 0, 2) === 'Up'),
|
||||
|
@ -142,6 +135,7 @@ class FunctionsV1
|
|||
$executionId = $this->args['executionId'] ?? '';
|
||||
$trigger = $this->args['trigger'] ?? '';
|
||||
$event = $this->args['event'] ?? '';
|
||||
$scheduleOriginal = $this->args['scheduleOriginal'] ?? '';
|
||||
$payload = (!empty($this->args['payload'])) ? json_encode($this->args['payload']) : '';
|
||||
|
||||
$database = new Database();
|
||||
|
@ -211,6 +205,43 @@ class FunctionsV1
|
|||
* If error count bigger than allowed change status to pause
|
||||
*/
|
||||
|
||||
// Reschedule
|
||||
Authorization::disable();
|
||||
$function = $database->getDocument($functionId);
|
||||
Authorization::reset();
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
throw new Exception('Function not found ('.$functionId.')');
|
||||
}
|
||||
|
||||
if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
|
||||
return;
|
||||
}
|
||||
|
||||
$cron = CronExpression::factory($function->getAttribute('schedule'));
|
||||
$next = (int) $cron->getNextRunDate()->format('U');
|
||||
|
||||
$function
|
||||
->setAttribute('scheduleNext', $next)
|
||||
->setAttribute('schedulePrevious', \time())
|
||||
;
|
||||
|
||||
$function = $database->updateDocument(array_merge($function->getArrayCopy(), [
|
||||
'scheduleNext' => $next,
|
||||
]));
|
||||
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
'projectId' => $projectId,
|
||||
'functionId' => $function->getId(),
|
||||
'executionId' => null,
|
||||
'trigger' => 'schedule',
|
||||
'scheduleOriginal' => $function->getAttribute('schedule', ''),
|
||||
]); // Async task rescheduale
|
||||
|
||||
Swoole\Coroutine\run(function () use ($trigger, $projectId, $executionId, $database, $function) {
|
||||
$this->execute($trigger, $projectId, $executionId, $database, $function);
|
||||
});
|
||||
|
||||
break;
|
||||
|
||||
case 'http':
|
||||
|
|
Loading…
Reference in a new issue