1
0
Fork 0
mirror of synced 2024-06-27 18:50:47 +12:00
This commit is contained in:
shimon 2022-11-06 23:41:33 +02:00
parent 0e99198fa8
commit ef7b53fbc7
7 changed files with 114 additions and 195 deletions

View file

@ -349,7 +349,6 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/realtime && \
chmod +x /usr/local/bin/executor && \
chmod +x /usr/local/bin/schedule && \
chmod +x /usr/local/bin/schedule-new && \
chmod +x /usr/local/bin/sdks && \
chmod +x /usr/local/bin/specs && \
chmod +x /usr/local/bin/ssl && \

View file

@ -757,7 +757,7 @@ $collections = [
'name' => 'schedules',
'attributes' => [
[
'$id' => ID::custom('type'),
'$id' => ID::custom('resourceType'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 100,
@ -768,7 +768,7 @@ $collections = [
'filters' => [],
],
[
'$id' => ID::custom('scheduleId'),
'$id' => ID::custom('resourceId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
@ -778,6 +778,17 @@ $collections = [
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('resourceUpdatedAt'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('projectId'),
'type' => Database::VAR_STRING,
@ -789,17 +800,6 @@ $collections = [
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('scheduleUpdatedAt'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('schedule'),
'type' => Database::VAR_STRING,
@ -836,16 +836,16 @@ $collections = [
],
'indexes' => [
[
'$id' => ID::custom('_key_region_type_scheduleUpdatedAt'),
'$id' => ID::custom('_key_region_resourceType_resourceUpdatedAt'),
'type' => Database::INDEX_KEY,
'attributes' => ['region', 'type','scheduleUpdatedAt'],
'attributes' => ['region', 'resourceType','resourceUpdatedAt'],
'lengths' => [],
'orders' => [],
],
[
'$id' => ID::custom('_key_region_type_projectId_scheduleId'),
'$id' => ID::custom('_key_region_resourceType_projectId_resourceId'),
'type' => Database::INDEX_KEY,
'attributes' => ['region', 'type', 'projectId', 'scheduleId'],
'attributes' => ['region', 'resourceType', 'projectId', 'resourceId'],
'lengths' => [],
'orders' => [],
],
@ -2249,6 +2249,17 @@ $collections = [
'array' => true,
'filters' => [],
],
[
'$id' => ID::custom('scheduleId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('schedule'),
'type' => Database::VAR_STRING,
@ -2271,28 +2282,6 @@ $collections = [
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('schedulePrevious'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('scheduleNext'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('timeout'),
'type' => Database::VAR_INTEGER,
@ -2359,20 +2348,6 @@ $collections = [
'lengths' => [128],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_scheduleNext'),
'type' => Database::INDEX_KEY,
'attributes' => ['scheduleNext'],
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_schedulePrevious'),
'type' => Database::INDEX_KEY,
'attributes' => ['schedulePrevious'],
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_timeout'),
'type' => Database::INDEX_KEY,

View file

@ -36,7 +36,6 @@ use Utopia\Validator\Text;
use Utopia\Validator\Range;
use Utopia\Validator\WhiteList;
use Utopia\Config\Config;
use Cron\CronExpression;
use Executor\Executor;
use Utopia\CLI\Console;
use Utopia\Database\Validator\Roles;
@ -72,10 +71,8 @@ App::post('/v1/functions')
->inject('project')
->inject('user')
->inject('events')
->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) {
$cron = !empty($schedule) ? new CronExpression($schedule) : null;
$next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null;
->inject('dbForConsole')
->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
$function = $dbForProject->createDocument('functions', new Document([
@ -88,22 +85,24 @@ App::post('/v1/functions')
'events' => $events,
'schedule' => $schedule,
'scheduleUpdatedAt' => DateTime::now(),
'schedulePrevious' => null,
'scheduleNext' => $next,
'timeout' => $timeout,
'search' => implode(' ', [$functionId, $name, $runtime])
]));
if ($next) {
// Async task reschedule
$functionEvent = new Func();
$functionEvent
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
}
$log = Authorization::skip(
fn() => $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region
'resourceType' => 'function',
'resourceId' => $function->getId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $function['schedule'],
'active' => false,
]))
);
$function->setAttribute('scheduleId', $log->getId());
$dbForProject->updateDocument('functions', $function->getId(), $function);
$eventsInstance->setParam('functionId', $function->getId());
@ -457,66 +456,25 @@ App::put('/v1/functions/:functionId')
throw new Exception(Exception::FUNCTION_NOT_FOUND);
}
$cron = !empty($schedule) ? new CronExpression($schedule) : null;
$next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null;
$enabled ??= $function->getAttribute('enabled', true);
$log = $schedule
->setAttribute('resourceUpdatedAt', $function['resourceUpdatedAt'])
->setAttribute('schedule', $function['schedule'])
->setAttribute('active', !empty($function->getAttribute('schedule') || !empty($schedule)));
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
'execute' => $execute,
'name' => $name,
'events' => $events,
'scheduleId' => $log->getId(),
'schedule' => $schedule,
'scheduleUpdatedAt' => DateTime::now(),
'scheduleNext' => $next,
'timeout' => $timeout,
'enabled' => $enabled,
'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]),
])));
if ($next) {
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
return $dbForConsole->findOne('schedules', [
Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region
Query::equal('type', ['function']),
Query::equal('projectId', [$project->getId()]),
Query::equal('scheduleId', [$function->getId()]),
]);
});
// TODO constrain with unique index ??
if (empty($schedule)) {
Authorization::skip(
fn() => $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region
'type' => 'function',
'scheduleId' => $function->getId(),
'projectId' => $project->getId(),
'scheduleUpdatedAt' => $function['scheduleUpdatedAt'],
'schedule' => $function['schedule'],
'active' => true,
]))
);
} else {
$schedule
->setAttribute('scheduleUpdatedAt', $function['scheduleUpdatedAt'])
->setAttribute('schedule', $function['schedule'])
;
Authorization::skip(function () use ($dbForConsole, $schedule, $function) {
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
});
}
// Async task reschedule
$functionEvent = new Func();
$functionEvent
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
}
$eventsInstance->setParam('functionId', $function->getId());
$response->dynamic($function, Response::MODEL_FUNCTION);
@ -542,7 +500,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
->inject('dbForProject')
->inject('project')
->inject('events')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events) {
->inject('dbForConsole')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
@ -568,6 +527,14 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
'deployment' => $deployment->getId()
])));
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
$log->setAttribute('active', true);
Authorization::skip(function () use ($dbForConsole, $log) {
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
});
$events
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId());
@ -595,7 +562,6 @@ App::delete('/v1/functions/:functionId')
->inject('events')
->inject('project')
->inject('dbForConsole')
->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events, Document $project, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -608,26 +574,17 @@ App::delete('/v1/functions/:functionId')
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
}
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
return $dbForConsole->findOne('schedules', [
Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region
Query::equal('type', ['function']),
Query::equal('projectId', [$project->getId()]),
Query::equal('scheduleId', [$function->getId()]),
]);
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
$log
->setAttribute('resourceUpdatedAt', DateTime::now())
->setAttribute('active', false)
;
Authorization::skip(function () use ($dbForConsole, $log) {
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
});
if (!empty($schedule)) {
$schedule
->setAttribute('scheduleUpdatedAt', DateTime::now())
->setAttribute('active', false)
;
Authorization::skip(function () use ($dbForConsole, $schedule) {
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
});
}
$deletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($function);
@ -664,7 +621,8 @@ App::post('/v1/functions/:functionId/deployments')
->inject('project')
->inject('deviceFunctions')
->inject('deviceLocal')
->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal) {
->inject('dbForConsole')
->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -816,6 +774,14 @@ App::post('/v1/functions/:functionId/deployments')
}
}
$log = $dbForProject->getDocument('schedules', $function['resourceId']);
$log->setAttribute('active', true);
Authorization::skip(function () use ($dbForConsole, $log) {
$dbForConsole->updateDocument('schedules', $log->getId(), $log);
});
$metadata = null;
$events

View file

@ -16,7 +16,7 @@ const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min
sleep(4); // Todo prevent PDOException
$cli
->task('schedule-new')
->task('schedule')
->desc('Function scheduler task')
->action(function () use ($register) {
Console::title('Scheduler V1');
@ -31,7 +31,7 @@ $cli
$cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate());
if ($next < $timeFrame) {
$queue[$next][$function['scheduleId']] = $function;
$queue[$next][$function['resourceId']] = $function;
}
}
};
@ -39,8 +39,9 @@ $cli
$removeFromQueue = function ($scheduleId) use (&$queue) {
foreach ($queue as $slot => $schedule) {
foreach ($schedule as $function) {
if ($scheduleId === $function['scheduleId']) {
unset($queue[$slot][$function['scheduleId']]);
if ($scheduleId === $function['resourceId']) {
Console::error("Unsetting :{$function['resourceId']} from queue slot $slot");
unset($queue[$slot][$function['resourceId']]);
}
}
}
@ -60,14 +61,14 @@ $cli
while ($sum === $limit) {
$results = $dbForConsole->find('schedules', [
Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('type', ['function']),
Query::equal('resourceType', ['function']),
Query::equal('active', [true]),
Query::limit($limit)
]);
$sum = count($results);
foreach ($results as $document) {
$functions[$document['scheduleId']] = $document;
$functions[$document['resourceId']] = $document;
$count++;
}
}
@ -90,22 +91,22 @@ $cli
while ($sum === $limit) {
$results = $dbForConsole->find('schedules', [
Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('type', ['function']),
Query::greaterThan('scheduleUpdatedAt', $lastUpdate),
Query::equal('resourceType', ['function']),
Query::greaterThan('resourceUpdatedAt', $lastUpdate),
Query::limit($limit)
]);
$sum = count($results);
foreach ($results as $document) {
$org = isset($functions[$document['scheduleId']]) ? strtotime($functions[$document['scheduleId']]['scheduleUpdatedAt']) : null;
$new = strtotime($document['scheduleUpdatedAt']);
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) {
Console::error("Removing: {$document['scheduleId']}");
unset($functions[$document['scheduleId']]);
Console::error("Removing: {$document['resourceId']}");
unset($functions[$document['resourceId']]);
} elseif ($new > $org) {
Console::error("Updating: {$document['scheduleId']}");
$functions[$document['scheduleId']] = $document;
Console::error("Updating: {$document['resourceId']}");
$functions[$document['resourceId']] = $document;
}
$removeFromQueue($document['scheduleId']);
$removeFromQueue($document['resourceId']);
$count++;
}
}
@ -120,31 +121,40 @@ $cli
$now = (new \DateTime())->format('Y-m-d H:i:00.000');
Console::info("Enqueue proc run at: $time");
// Debug
foreach ($queue as $slot => $schedule) {
Console::log("Slot: $slot");
foreach ($schedule as $function) {
Console::log("{$function['resourceId']} {$function['schedule']}");
}
}
/**
* Lopping time slots
*/
foreach ($queue as $slot => $schedule) {
if ($now === $slot) {
foreach ($schedule as $function) {
/**
* Enqueue function
*/
Console::warning("Enqueueing :{$function['scheduleId']}");
Console::warning("Enqueueing :{$function['resourceId']}");
$cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate());
/**
* If next schedule is in 5-min timeframe
* and it was not removed re-enqueue the function.
* and it was not removed or changed, re-enqueue the function.
*/
if (
$next < $timeFrame &&
!empty($functions[$function['scheduleId']])
!empty($functions[$function['resourceId']] &&
$function['schedule'] === $functions[$function['resourceId']]['schedule'])
) {
Console::warning("re-enqueueing :{$function['scheduleId']}");
$queue[$next][$function['scheduleId']] = $function;
Console::warning("re-enqueueing :{$function['resourceId']}");
$queue[$next][$function['resourceId']] = $function;
}
unset($queue[$slot][$function['scheduleId']]); /** removing function from slot */
unset($queue[$slot][$function['resourceId']]); /** removing function from slot */
}
unset($queue[$slot]); /** removing slot */
}

View file

@ -1,10 +1,3 @@
#!/bin/sh
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
then
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
else
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
fi
INTERVAL=1 REDIS_BACKEND=$REDIS_BACKEND RESQUE_PHP='/usr/src/code/vendor/autoload.php' php /usr/src/code/vendor/bin/resque-scheduler
php /usr/src/code/app/cli.php schedule $@

View file

@ -1,3 +0,0 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule-new $@

View file

@ -686,17 +686,16 @@ services:
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
appwrite-schedule-new:
entrypoint: schedule-new
appwrite-schedule:
entrypoint: schedule
<<: *x-logging
container_name: appwrite-schedule-new
container_name: appwrite-schedule
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
depends_on:
- mariadb
- redis
@ -717,26 +716,6 @@ services:
- _APP_CONNECTIONS_QUEUE
- _APP_REGION
appwrite-schedule:
entrypoint: schedule
<<: *x-logging
container_name: appwrite-schedule
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
environment:
- _APP_ENV
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_CONNECTIONS_QUEUE
mariadb:
image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p
container_name: appwrite-mariadb