queue
This commit is contained in:
parent
bc631d926e
commit
ed505fba7c
5 changed files with 103 additions and 41 deletions
3
.env
3
.env
|
@ -86,6 +86,7 @@ _APP_USAGE_DATABASE_INTERVAL=15
|
|||
_APP_USAGE_STATS=enabled
|
||||
_APP_LOGGING_PROVIDER=
|
||||
_APP_LOGGING_CONFIG=
|
||||
_APP_REGION=nyc1
|
||||
DOCKERHUB_PULL_USERNAME=
|
||||
DOCKERHUB_PULL_PASSWORD=
|
||||
DOCKERHUB_PULL_EMAIL=
|
||||
DOCKERHUB_PULL_EMAIL=
|
||||
|
|
|
@ -760,7 +760,7 @@ $collections = [
|
|||
'$id' => ID::custom('type'),
|
||||
'type' => Database::VAR_STRING,
|
||||
'format' => '',
|
||||
'size' => Database::LENGTH_KEY,
|
||||
'size' => 100,
|
||||
'signed' => true,
|
||||
'required' => false,
|
||||
'default' => null,
|
||||
|
@ -804,7 +804,7 @@ $collections = [
|
|||
'$id' => ID::custom('schedule'),
|
||||
'type' => Database::VAR_STRING,
|
||||
'format' => '',
|
||||
'size' => 256,
|
||||
'size' => 100,
|
||||
'signed' => true,
|
||||
'required' => false,
|
||||
'default' => null,
|
||||
|
@ -822,19 +822,30 @@ $collections = [
|
|||
'default' => null,
|
||||
'array' => false,
|
||||
],
|
||||
[
|
||||
'$id' => ID::custom('region'),
|
||||
'type' => Database::VAR_STRING,
|
||||
'format' => '',
|
||||
'size' => 10,
|
||||
'signed' => true,
|
||||
'required' => true,
|
||||
'default' => null,
|
||||
'array' => false,
|
||||
'filters' => [],
|
||||
],
|
||||
],
|
||||
'indexes' => [
|
||||
[
|
||||
'$id' => ID::custom('_key_type_scheduleUpdatedAt'),
|
||||
'$id' => ID::custom('_key_region_type_scheduleUpdatedAt'),
|
||||
'type' => Database::INDEX_KEY,
|
||||
'attributes' => ['type','scheduleUpdatedAt'],
|
||||
'attributes' => ['region', 'type','scheduleUpdatedAt'],
|
||||
'lengths' => [],
|
||||
'orders' => [],
|
||||
],
|
||||
[
|
||||
'$id' => ID::custom('_key_type_projectId_scheduleId'),
|
||||
'$id' => ID::custom('_key_region_type_projectId_scheduleId'),
|
||||
'type' => Database::INDEX_KEY,
|
||||
'attributes' => ['type', 'projectId', 'scheduleId'],
|
||||
'attributes' => ['region', 'type', 'projectId', 'scheduleId'],
|
||||
'lengths' => [],
|
||||
'orders' => [],
|
||||
],
|
||||
|
|
|
@ -477,6 +477,7 @@ App::put('/v1/functions/:functionId')
|
|||
if ($next) {
|
||||
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
|
||||
return $dbForConsole->findOne('schedules', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region
|
||||
Query::equal('type', ['function']),
|
||||
Query::equal('projectId', [$project->getId()]),
|
||||
Query::equal('scheduleId', [$function->getId()]),
|
||||
|
@ -487,6 +488,7 @@ App::put('/v1/functions/:functionId')
|
|||
if (empty($schedule)) {
|
||||
Authorization::skip(
|
||||
fn() => $dbForConsole->createDocument('schedules', new Document([
|
||||
'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region
|
||||
'type' => 'function',
|
||||
'scheduleId' => $function->getId(),
|
||||
'projectId' => $project->getId(),
|
||||
|
@ -608,6 +610,7 @@ App::delete('/v1/functions/:functionId')
|
|||
|
||||
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
|
||||
return $dbForConsole->findOne('schedules', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]), // Todo replace with projects region
|
||||
Query::equal('type', ['function']),
|
||||
Query::equal('projectId', [$project->getId()]),
|
||||
Query::equal('scheduleId', [$function->getId()]),
|
||||
|
|
|
@ -3,14 +3,16 @@
|
|||
global $cli;
|
||||
global $register;
|
||||
|
||||
use Cron\CronExpression;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Query;
|
||||
use Swoole\Timer;
|
||||
|
||||
|
||||
const FUNCTION_VALIDATION_TIMER = 30; //seconds
|
||||
const FUNCTION_ENQUEUE_TIMER = 10; //seconds
|
||||
const FUNCTION_VALIDATION_TIMER = 180; //seconds
|
||||
const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
||||
const ENQUEUE_TIME_FRAME = 60 * 5; // 5 min
|
||||
|
||||
$cli
|
||||
->task('schedule-new')
|
||||
|
@ -19,15 +21,33 @@ $cli
|
|||
Console::title('Scheduler V1');
|
||||
Console::success(APP_NAME . ' Scheduler v1 has started');
|
||||
|
||||
sleep(4);
|
||||
$createQueue = function () use (&$functions, &$queue) {
|
||||
/**
|
||||
* Creating smaller functions list containing 5-min timeframe.
|
||||
*/
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME);
|
||||
foreach ($functions as $function) {
|
||||
$cron = new CronExpression($function['schedule']);
|
||||
$next = DateTime::format($cron->getNextRunDate());
|
||||
if ($next < $timeFrame) {
|
||||
$queue[$next][$function['scheduleId']] = $function;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
$dbForConsole = getConsoleDB();
|
||||
$count = 0;
|
||||
$limit = 50;
|
||||
$sum = $limit;
|
||||
$functions = [];
|
||||
$queue = [];
|
||||
|
||||
/**
|
||||
* Initial run fill $functions list
|
||||
*/
|
||||
while ($sum === $limit) {
|
||||
$results = $dbForConsole->find('schedules', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('type', ['function']),
|
||||
Query::equal('active', [true]),
|
||||
Query::limit($limit)
|
||||
|
@ -40,56 +60,81 @@ $cli
|
|||
}
|
||||
}
|
||||
|
||||
$lastValidationTime = DateTime::format((new \DateTime())->sub(\DateInterval::createFromDateString(FUNCTION_VALIDATION_TIMER . ' seconds')));
|
||||
$createQueue();
|
||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_VALIDATION_TIMER);
|
||||
|
||||
Co\run(
|
||||
function () use ($dbForConsole, &$functions, &$lastValidationTime) {
|
||||
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($dbForConsole, &$functions, &$lastValidationTime) {
|
||||
function () use ($createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
$time = DateTime::now();
|
||||
Console::success("Validation proc run at : $time");
|
||||
var_dump($lastValidationTime);
|
||||
$count = 0;
|
||||
$limit = 50;
|
||||
$sum = $limit;
|
||||
$tmp = [];
|
||||
|
||||
Console::info("Update proc run at: $time last update was at $lastUpdate");
|
||||
/**
|
||||
* Updating functions list from DB.
|
||||
*/
|
||||
while ($sum === $limit) {
|
||||
var_dump($lastValidationTime);
|
||||
$results = $dbForConsole->find('schedules', [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('type', ['function']),
|
||||
Query::greaterThan('scheduleUpdatedAt', $lastValidationTime),
|
||||
Query::greaterThan('scheduleUpdatedAt', $lastUpdate),
|
||||
Query::limit($limit)
|
||||
]);
|
||||
|
||||
$lastValidationTime = DateTime::now();
|
||||
|
||||
$sum = count($results);
|
||||
foreach ($results as $document) {
|
||||
$tmp['scheduleId'] = $document;
|
||||
$org = isset($functions[$document['scheduleId']]) ? strtotime($functions[$document['scheduleId']]['scheduleUpdatedAt']) : null;
|
||||
$new = strtotime($document['scheduleUpdatedAt']);
|
||||
if ($document['active'] === false) {
|
||||
Console::error("Removing: {$document['scheduleId']}");
|
||||
unset($functions[$document['scheduleId']]);
|
||||
} elseif ($new > $org) {
|
||||
Console::error("Updating: {$document['scheduleId']}");
|
||||
$functions[$document['scheduleId']] = $document;
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($tmp as $document) {
|
||||
$org = strtotime($functions[$document['scheduleId']]['scheduleUpdatedAt']);
|
||||
$new = strtotime($document['scheduleUpdatedAt']);
|
||||
var_dump($document['scheduleId']);
|
||||
var_dump($document['active']);
|
||||
if ($document['active'] === false) {
|
||||
Console::error("Removing : {$document['scheduleId']}");
|
||||
unset($functions[$document['scheduleId']]);
|
||||
} elseif (!isset($functions[$document['scheduleId']]) || $new > $org) {
|
||||
Console::error("Updating : {$document['scheduleId']}");
|
||||
$functions[$document['scheduleId']] = $document;
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
$lastUpdate = DateTime::now();
|
||||
$createQueue();
|
||||
});
|
||||
|
||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, $functions) {
|
||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
||||
$time = DateTime::now();
|
||||
Console::success("Enqueue proc run at : $time");
|
||||
foreach ($functions as $function) {
|
||||
Console::info("Enqueueing : {$function->getid()}");
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */
|
||||
$now = (new \DateTime())->format('Y-m-d H:i:00.000');
|
||||
|
||||
Console::info("Enqueue proc run at: $time");
|
||||
|
||||
/**
|
||||
* Lopping time slots
|
||||
*/
|
||||
foreach ($queue as $slot => $schedule) {
|
||||
if ($now === $slot) {
|
||||
foreach ($schedule as $function) {
|
||||
/**
|
||||
* Enqueue function
|
||||
*/
|
||||
Console::warning("Enqueueing :{$function['scheduleId']}");
|
||||
$cron = new CronExpression($function['schedule']);
|
||||
$next = DateTime::format($cron->getNextRunDate());
|
||||
/**
|
||||
* If next schedule is in 5-min timeframe
|
||||
* and it was not removed re-enqueue the function.
|
||||
*/
|
||||
if (
|
||||
$next < $timeFrame &&
|
||||
!empty($functions[$function['scheduleId']])
|
||||
) {
|
||||
Console::warning("re-enqueueing :{$function['scheduleId']}");
|
||||
$queue[$next][$function['scheduleId']] = $function;
|
||||
}
|
||||
unset($queue[$slot][$function['scheduleId']]); /** removing function from slot */
|
||||
}
|
||||
unset($queue[$slot]); /** removing slot */
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -183,6 +183,7 @@ services:
|
|||
- _APP_MAINTENANCE_RETENTION_AUDIT
|
||||
- _APP_SMS_PROVIDER
|
||||
- _APP_SMS_FROM
|
||||
- _APP_REGION
|
||||
|
||||
appwrite-realtime:
|
||||
entrypoint: realtime
|
||||
|
@ -714,6 +715,7 @@ services:
|
|||
- _APP_CONNECTIONS_DB_CONSOLE
|
||||
- _APP_CONNECTIONS_CACHE
|
||||
- _APP_CONNECTIONS_QUEUE
|
||||
- _APP_REGION
|
||||
|
||||
appwrite-schedule:
|
||||
entrypoint: schedule
|
||||
|
|
Loading…
Reference in a new issue