From ef7b53fbc7577bed8f01926f607278473369f8b1 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 6 Nov 2022 23:41:33 +0200 Subject: [PATCH] queue --- Dockerfile | 1 - app/config/collections.php | 81 ++++++------------ app/controllers/api/functions.php | 136 +++++++++++------------------- app/tasks/schedule.php | 52 +++++++----- bin/schedule | 9 +- bin/schedule-new | 3 - docker-compose.yml | 27 +----- 7 files changed, 114 insertions(+), 195 deletions(-) delete mode 100644 bin/schedule-new diff --git a/Dockerfile b/Dockerfile index cfa588c9cf..410fb1c44c 100755 --- a/Dockerfile +++ b/Dockerfile @@ -349,7 +349,6 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/realtime && \ chmod +x /usr/local/bin/executor && \ chmod +x /usr/local/bin/schedule && \ - chmod +x /usr/local/bin/schedule-new && \ chmod +x /usr/local/bin/sdks && \ chmod +x /usr/local/bin/specs && \ chmod +x /usr/local/bin/ssl && \ diff --git a/app/config/collections.php b/app/config/collections.php index 17f1fa8c41..345d802dc4 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -757,7 +757,7 @@ $collections = [ 'name' => 'schedules', 'attributes' => [ [ - '$id' => ID::custom('type'), + '$id' => ID::custom('resourceType'), 'type' => Database::VAR_STRING, 'format' => '', 'size' => 100, @@ -768,7 +768,7 @@ $collections = [ 'filters' => [], ], [ - '$id' => ID::custom('scheduleId'), + '$id' => ID::custom('resourceId'), 'type' => Database::VAR_STRING, 'format' => '', 'size' => Database::LENGTH_KEY, @@ -778,6 +778,17 @@ $collections = [ 'array' => false, 'filters' => [], ], + [ + '$id' => ID::custom('resourceUpdatedAt'), + 'type' => Database::VAR_DATETIME, + 'format' => '', + 'size' => 0, + 'signed' => false, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => ['datetime'], + ], [ '$id' => ID::custom('projectId'), 'type' => Database::VAR_STRING, @@ -789,17 +800,6 @@ $collections = [ 'array' => false, 'filters' => [], ], - [ - '$id' => ID::custom('scheduleUpdatedAt'), - 'type' => Database::VAR_DATETIME, - 'format' => '', - 'size' => 0, - 'signed' => false, - 'required' => false, - 'default' => null, - 'array' => false, - 'filters' => ['datetime'], - ], [ '$id' => ID::custom('schedule'), 'type' => Database::VAR_STRING, @@ -836,16 +836,16 @@ $collections = [ ], 'indexes' => [ [ - '$id' => ID::custom('_key_region_type_scheduleUpdatedAt'), + '$id' => ID::custom('_key_region_resourceType_resourceUpdatedAt'), 'type' => Database::INDEX_KEY, - 'attributes' => ['region', 'type','scheduleUpdatedAt'], + 'attributes' => ['region', 'resourceType','resourceUpdatedAt'], 'lengths' => [], 'orders' => [], ], [ - '$id' => ID::custom('_key_region_type_projectId_scheduleId'), + '$id' => ID::custom('_key_region_resourceType_projectId_resourceId'), 'type' => Database::INDEX_KEY, - 'attributes' => ['region', 'type', 'projectId', 'scheduleId'], + 'attributes' => ['region', 'resourceType', 'projectId', 'resourceId'], 'lengths' => [], 'orders' => [], ], @@ -2249,6 +2249,17 @@ $collections = [ 'array' => true, 'filters' => [], ], + [ + '$id' => ID::custom('scheduleId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], [ '$id' => ID::custom('schedule'), 'type' => Database::VAR_STRING, @@ -2271,28 +2282,6 @@ $collections = [ 'array' => false, 'filters' => ['datetime'], ], - [ - '$id' => ID::custom('schedulePrevious'), - 'type' => Database::VAR_DATETIME, - 'format' => '', - 'size' => 0, - 'signed' => false, - 'required' => false, - 'default' => null, - 'array' => false, - 'filters' => ['datetime'], - ], - [ - '$id' => ID::custom('scheduleNext'), - 'type' => Database::VAR_DATETIME, - 'format' => '', - 'size' => 0, - 'signed' => false, - 'required' => false, - 'default' => null, - 'array' => false, - 'filters' => ['datetime'], - ], [ '$id' => ID::custom('timeout'), 'type' => Database::VAR_INTEGER, @@ -2359,20 +2348,6 @@ $collections = [ 'lengths' => [128], 'orders' => [Database::ORDER_ASC], ], - [ - '$id' => ID::custom('_key_scheduleNext'), - 'type' => Database::INDEX_KEY, - 'attributes' => ['scheduleNext'], - 'lengths' => [], - 'orders' => [Database::ORDER_ASC], - ], - [ - '$id' => ID::custom('_key_schedulePrevious'), - 'type' => Database::INDEX_KEY, - 'attributes' => ['schedulePrevious'], - 'lengths' => [], - 'orders' => [Database::ORDER_ASC], - ], [ '$id' => ID::custom('_key_timeout'), 'type' => Database::INDEX_KEY, diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index a7c5dd7763..0edfa26eff 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -36,7 +36,6 @@ use Utopia\Validator\Text; use Utopia\Validator\Range; use Utopia\Validator\WhiteList; use Utopia\Config\Config; -use Cron\CronExpression; use Executor\Executor; use Utopia\CLI\Console; use Utopia\Database\Validator\Roles; @@ -72,10 +71,8 @@ App::post('/v1/functions') ->inject('project') ->inject('user') ->inject('events') - ->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) { - - $cron = !empty($schedule) ? new CronExpression($schedule) : null; - $next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null; + ->inject('dbForConsole') + ->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; $function = $dbForProject->createDocument('functions', new Document([ @@ -88,22 +85,24 @@ App::post('/v1/functions') 'events' => $events, 'schedule' => $schedule, 'scheduleUpdatedAt' => DateTime::now(), - 'schedulePrevious' => null, - 'scheduleNext' => $next, 'timeout' => $timeout, 'search' => implode(' ', [$functionId, $name, $runtime]) ])); - if ($next) { - // Async task reschedule - $functionEvent = new Func(); - $functionEvent - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); - } + $log = Authorization::skip( + fn() => $dbForConsole->createDocument('schedules', new Document([ + 'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region + 'resourceType' => 'function', + 'resourceId' => $function->getId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $function['schedule'], + 'active' => false, + ])) + ); + + $function->setAttribute('scheduleId', $log->getId()); + $dbForProject->updateDocument('functions', $function->getId(), $function); $eventsInstance->setParam('functionId', $function->getId()); @@ -457,66 +456,25 @@ App::put('/v1/functions/:functionId') throw new Exception(Exception::FUNCTION_NOT_FOUND); } - $cron = !empty($schedule) ? new CronExpression($schedule) : null; - $next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null; - $enabled ??= $function->getAttribute('enabled', true); + $log = $schedule + ->setAttribute('resourceUpdatedAt', $function['resourceUpdatedAt']) + ->setAttribute('schedule', $function['schedule']) + ->setAttribute('active', !empty($function->getAttribute('schedule') || !empty($schedule))); + $function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ 'execute' => $execute, 'name' => $name, 'events' => $events, + 'scheduleId' => $log->getId(), 'schedule' => $schedule, 'scheduleUpdatedAt' => DateTime::now(), - 'scheduleNext' => $next, 'timeout' => $timeout, 'enabled' => $enabled, 'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]), ]))); - if ($next) { - $schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) { - return $dbForConsole->findOne('schedules', [ - Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region - Query::equal('type', ['function']), - Query::equal('projectId', [$project->getId()]), - Query::equal('scheduleId', [$function->getId()]), - ]); - }); - - // TODO constrain with unique index ?? - if (empty($schedule)) { - Authorization::skip( - fn() => $dbForConsole->createDocument('schedules', new Document([ - 'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region - 'type' => 'function', - 'scheduleId' => $function->getId(), - 'projectId' => $project->getId(), - 'scheduleUpdatedAt' => $function['scheduleUpdatedAt'], - 'schedule' => $function['schedule'], - 'active' => true, - ])) - ); - } else { - $schedule - ->setAttribute('scheduleUpdatedAt', $function['scheduleUpdatedAt']) - ->setAttribute('schedule', $function['schedule']) - ; - Authorization::skip(function () use ($dbForConsole, $schedule, $function) { - $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); - }); - } - - // Async task reschedule - $functionEvent = new Func(); - $functionEvent - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); - } - $eventsInstance->setParam('functionId', $function->getId()); $response->dynamic($function, Response::MODEL_FUNCTION); @@ -542,7 +500,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId') ->inject('dbForProject') ->inject('project') ->inject('events') - ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events) { + ->inject('dbForConsole') + ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); $deployment = $dbForProject->getDocument('deployments', $deploymentId); @@ -568,6 +527,14 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId') 'deployment' => $deployment->getId() ]))); + $log = $dbForProject->getDocument('schedules', $function['resourceId']); + + $log->setAttribute('active', true); + + Authorization::skip(function () use ($dbForConsole, $log) { + $dbForConsole->updateDocument('schedules', $log->getId(), $log); + }); + $events ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()); @@ -595,7 +562,6 @@ App::delete('/v1/functions/:functionId') ->inject('events') ->inject('project') ->inject('dbForConsole') - ->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events, Document $project, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); @@ -608,26 +574,17 @@ App::delete('/v1/functions/:functionId') throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB'); } - $schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) { - return $dbForConsole->findOne('schedules', [ - Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region - Query::equal('type', ['function']), - Query::equal('projectId', [$project->getId()]), - Query::equal('scheduleId', [$function->getId()]), - ]); + $log = $dbForProject->getDocument('schedules', $function['resourceId']); + + $log + ->setAttribute('resourceUpdatedAt', DateTime::now()) + ->setAttribute('active', false) + ; + + Authorization::skip(function () use ($dbForConsole, $log) { + $dbForConsole->updateDocument('schedules', $log->getId(), $log); }); - if (!empty($schedule)) { - $schedule - ->setAttribute('scheduleUpdatedAt', DateTime::now()) - ->setAttribute('active', false) - ; - - Authorization::skip(function () use ($dbForConsole, $schedule) { - $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule); - }); - } - $deletes ->setType(DELETE_TYPE_DOCUMENT) ->setDocument($function); @@ -664,7 +621,8 @@ App::post('/v1/functions/:functionId/deployments') ->inject('project') ->inject('deviceFunctions') ->inject('deviceLocal') - ->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal) { + ->inject('dbForConsole') + ->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); @@ -816,6 +774,14 @@ App::post('/v1/functions/:functionId/deployments') } } + $log = $dbForProject->getDocument('schedules', $function['resourceId']); + + $log->setAttribute('active', true); + + Authorization::skip(function () use ($dbForConsole, $log) { + $dbForConsole->updateDocument('schedules', $log->getId(), $log); + }); + $metadata = null; $events diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index b18f570af7..4664c8c10f 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -16,7 +16,7 @@ const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min sleep(4); // Todo prevent PDOException $cli -->task('schedule-new') +->task('schedule') ->desc('Function scheduler task') ->action(function () use ($register) { Console::title('Scheduler V1'); @@ -31,7 +31,7 @@ $cli $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); if ($next < $timeFrame) { - $queue[$next][$function['scheduleId']] = $function; + $queue[$next][$function['resourceId']] = $function; } } }; @@ -39,8 +39,9 @@ $cli $removeFromQueue = function ($scheduleId) use (&$queue) { foreach ($queue as $slot => $schedule) { foreach ($schedule as $function) { - if ($scheduleId === $function['scheduleId']) { - unset($queue[$slot][$function['scheduleId']]); + if ($scheduleId === $function['resourceId']) { + Console::error("Unsetting :{$function['resourceId']} from queue slot $slot"); + unset($queue[$slot][$function['resourceId']]); } } } @@ -60,14 +61,14 @@ $cli while ($sum === $limit) { $results = $dbForConsole->find('schedules', [ Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('type', ['function']), + Query::equal('resourceType', ['function']), Query::equal('active', [true]), Query::limit($limit) ]); $sum = count($results); foreach ($results as $document) { - $functions[$document['scheduleId']] = $document; + $functions[$document['resourceId']] = $document; $count++; } } @@ -90,22 +91,22 @@ $cli while ($sum === $limit) { $results = $dbForConsole->find('schedules', [ Query::equal('region', [App::getEnv('_APP_REGION')]), - Query::equal('type', ['function']), - Query::greaterThan('scheduleUpdatedAt', $lastUpdate), + Query::equal('resourceType', ['function']), + Query::greaterThan('resourceUpdatedAt', $lastUpdate), Query::limit($limit) ]); $sum = count($results); foreach ($results as $document) { - $org = isset($functions[$document['scheduleId']]) ? strtotime($functions[$document['scheduleId']]['scheduleUpdatedAt']) : null; - $new = strtotime($document['scheduleUpdatedAt']); + $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; + $new = strtotime($document['resourceUpdatedAt']); if ($document['active'] === false) { - Console::error("Removing: {$document['scheduleId']}"); - unset($functions[$document['scheduleId']]); + Console::error("Removing: {$document['resourceId']}"); + unset($functions[$document['resourceId']]); } elseif ($new > $org) { - Console::error("Updating: {$document['scheduleId']}"); - $functions[$document['scheduleId']] = $document; + Console::error("Updating: {$document['resourceId']}"); + $functions[$document['resourceId']] = $document; } - $removeFromQueue($document['scheduleId']); + $removeFromQueue($document['resourceId']); $count++; } } @@ -120,31 +121,40 @@ $cli $now = (new \DateTime())->format('Y-m-d H:i:00.000'); Console::info("Enqueue proc run at: $time"); + // Debug + foreach ($queue as $slot => $schedule) { + Console::log("Slot: $slot"); + foreach ($schedule as $function) { + Console::log("{$function['resourceId']} {$function['schedule']}"); + } + } /** * Lopping time slots */ + foreach ($queue as $slot => $schedule) { if ($now === $slot) { foreach ($schedule as $function) { /** * Enqueue function */ - Console::warning("Enqueueing :{$function['scheduleId']}"); + Console::warning("Enqueueing :{$function['resourceId']}"); $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); /** * If next schedule is in 5-min timeframe - * and it was not removed re-enqueue the function. + * and it was not removed or changed, re-enqueue the function. */ if ( $next < $timeFrame && - !empty($functions[$function['scheduleId']]) + !empty($functions[$function['resourceId']] && + $function['schedule'] === $functions[$function['resourceId']]['schedule']) ) { - Console::warning("re-enqueueing :{$function['scheduleId']}"); - $queue[$next][$function['scheduleId']] = $function; + Console::warning("re-enqueueing :{$function['resourceId']}"); + $queue[$next][$function['resourceId']] = $function; } - unset($queue[$slot][$function['scheduleId']]); /** removing function from slot */ + unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ } unset($queue[$slot]); /** removing slot */ } diff --git a/bin/schedule b/bin/schedule index dbc6d94d96..ddd1ea7f35 100644 --- a/bin/schedule +++ b/bin/schedule @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=1 REDIS_BACKEND=$REDIS_BACKEND RESQUE_PHP='/usr/src/code/vendor/autoload.php' php /usr/src/code/vendor/bin/resque-scheduler +php /usr/src/code/app/cli.php schedule $@ \ No newline at end of file diff --git a/bin/schedule-new b/bin/schedule-new deleted file mode 100644 index 6e6e54266b..0000000000 --- a/bin/schedule-new +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -php /usr/src/code/app/cli.php schedule-new $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 34a7d1c2b7..0dac1dc094 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -686,17 +686,16 @@ services: - _APP_LOGGING_PROVIDER - _APP_LOGGING_CONFIG - appwrite-schedule-new: - entrypoint: schedule-new + appwrite-schedule: + entrypoint: schedule <<: *x-logging - container_name: appwrite-schedule-new + container_name: appwrite-schedule image: appwrite-dev networks: - appwrite volumes: - ./app:/usr/src/code/app - ./src:/usr/src/code/src - - ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database depends_on: - mariadb - redis @@ -717,26 +716,6 @@ services: - _APP_CONNECTIONS_QUEUE - _APP_REGION - appwrite-schedule: - entrypoint: schedule - <<: *x-logging - container_name: appwrite-schedule - image: appwrite-dev - networks: - - appwrite - volumes: - - ./app:/usr/src/code/app - - ./src:/usr/src/code/src - depends_on: - - redis - environment: - - _APP_ENV - - _APP_REDIS_HOST - - _APP_REDIS_PORT - - _APP_REDIS_USER - - _APP_REDIS_PASS - - _APP_CONNECTIONS_QUEUE - mariadb: image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p container_name: appwrite-mariadb