Abstract scheduling base
This commit is contained in:
parent
a40c6fce64
commit
81f1eb3506
6 changed files with 348 additions and 406 deletions
|
@ -263,6 +263,9 @@ class Exception extends \Exception
|
||||||
public const MESSAGE_TARGET_NOT_SMS = 'message_target_not_sms';
|
public const MESSAGE_TARGET_NOT_SMS = 'message_target_not_sms';
|
||||||
public const MESSAGE_TARGET_NOT_PUSH = 'message_target_not_push';
|
public const MESSAGE_TARGET_NOT_PUSH = 'message_target_not_push';
|
||||||
|
|
||||||
|
/** Schedules */
|
||||||
|
public const SCHEDULE_NOT_FOUND = 'schedule_not_found';
|
||||||
|
|
||||||
|
|
||||||
protected string $type = '';
|
protected string $type = '';
|
||||||
protected array $errors = [];
|
protected array $errors = [];
|
||||||
|
|
|
@ -1,244 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
namespace Appwrite\Platform\Tasks;
|
|
||||||
|
|
||||||
use Cron\CronExpression;
|
|
||||||
use Swoole\Timer;
|
|
||||||
use Utopia\App;
|
|
||||||
use Utopia\Platform\Action;
|
|
||||||
use Utopia\CLI\Console;
|
|
||||||
use Utopia\Database\DateTime;
|
|
||||||
use Utopia\Database\Document;
|
|
||||||
use Utopia\Database\Query;
|
|
||||||
use Utopia\Database\Database;
|
|
||||||
use Utopia\Pools\Group;
|
|
||||||
use Appwrite\Event\Func;
|
|
||||||
|
|
||||||
use function Swoole\Coroutine\run;
|
|
||||||
|
|
||||||
class Schedule extends Action
|
|
||||||
{
|
|
||||||
public const FUNCTION_UPDATE_TIMER = 10; //seconds
|
|
||||||
public const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
|
||||||
|
|
||||||
public static function getName(): string
|
|
||||||
{
|
|
||||||
return 'schedule';
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __construct()
|
|
||||||
{
|
|
||||||
$this
|
|
||||||
->desc('Execute functions scheduled in Appwrite')
|
|
||||||
->inject('pools')
|
|
||||||
->inject('dbForConsole')
|
|
||||||
->inject('getProjectDB')
|
|
||||||
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 1. Load all documents from 'schedules' collection to create local copy
|
|
||||||
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
|
||||||
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
|
|
||||||
*/
|
|
||||||
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
|
||||||
{
|
|
||||||
Console::title('Scheduler V1');
|
|
||||||
Console::success(APP_NAME . ' Scheduler v1 has started');
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Extract only nessessary attributes to lower memory used.
|
|
||||||
*
|
|
||||||
* @var Document $schedule
|
|
||||||
* @return array
|
|
||||||
*/
|
|
||||||
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
|
|
||||||
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
|
|
||||||
|
|
||||||
$function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'));
|
|
||||||
|
|
||||||
return [
|
|
||||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
|
||||||
'schedule' => $schedule->getAttribute('schedule'),
|
|
||||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
|
||||||
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
|
||||||
'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
$schedules = []; // Local copy of 'schedules' collection
|
|
||||||
$lastSyncUpdate = DateTime::now();
|
|
||||||
|
|
||||||
$limit = 10000;
|
|
||||||
$sum = $limit;
|
|
||||||
$total = 0;
|
|
||||||
$loadStart = \microtime(true);
|
|
||||||
$latestDocument = null;
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
|
|
||||||
Query::equal('resourceType', ['function']),
|
|
||||||
Query::equal('active', [true]),
|
|
||||||
]));
|
|
||||||
|
|
||||||
$sum = count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $document) {
|
|
||||||
try {
|
|
||||||
$schedules[$document['resourceId']] = $getSchedule($document);
|
|
||||||
} catch (\Throwable $th) {
|
|
||||||
Console::error("Failed to load schedule for project {$document['projectId']} and function {$document['resourceId']}");
|
|
||||||
Console::error($th->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$pools->reclaim();
|
|
||||||
|
|
||||||
Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds");
|
|
||||||
|
|
||||||
Console::success("Starting timers at " . DateTime::now());
|
|
||||||
|
|
||||||
run(
|
|
||||||
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
|
|
||||||
/**
|
|
||||||
* The timer synchronize $schedules copy with database collection.
|
|
||||||
*/
|
|
||||||
Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
|
|
||||||
$time = DateTime::now();
|
|
||||||
$timerStart = \microtime(true);
|
|
||||||
|
|
||||||
$limit = 1000;
|
|
||||||
$sum = $limit;
|
|
||||||
$total = 0;
|
|
||||||
$latestDocument = null;
|
|
||||||
|
|
||||||
Console::log("Sync tick: Running at $time");
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
|
|
||||||
Query::equal('resourceType', ['function']),
|
|
||||||
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
|
|
||||||
]));
|
|
||||||
|
|
||||||
$sum = count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $document) {
|
|
||||||
$localDocument = $schedules[$document['resourceId']] ?? null;
|
|
||||||
|
|
||||||
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
|
|
||||||
$new = strtotime($document['resourceUpdatedAt']);
|
|
||||||
|
|
||||||
if ($document['active'] === false) {
|
|
||||||
Console::info("Removing: {$document['resourceId']}");
|
|
||||||
unset($schedules[$document['resourceId']]);
|
|
||||||
} elseif ($new !== $org) {
|
|
||||||
Console::info("Updating: {$document['resourceId']}");
|
|
||||||
$schedules[$document['resourceId']] = $getSchedule($document);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$lastSyncUpdate = $time;
|
|
||||||
$timerEnd = \microtime(true);
|
|
||||||
|
|
||||||
$pools->reclaim();
|
|
||||||
|
|
||||||
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The timer to prepare soon-to-execute schedules.
|
|
||||||
*/
|
|
||||||
$lastEnqueueUpdate = null;
|
|
||||||
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) {
|
|
||||||
$timerStart = \microtime(true);
|
|
||||||
$time = DateTime::now();
|
|
||||||
|
|
||||||
$enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
|
|
||||||
$timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
|
|
||||||
|
|
||||||
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
|
|
||||||
|
|
||||||
$total = 0;
|
|
||||||
|
|
||||||
$delayedExecutions = []; // Group executions with same delay to share one coroutine
|
|
||||||
|
|
||||||
foreach ($schedules as $key => $schedule) {
|
|
||||||
$cron = new CronExpression($schedule['schedule']);
|
|
||||||
$nextDate = $cron->getNextRunDate();
|
|
||||||
$next = DateTime::format($nextDate);
|
|
||||||
|
|
||||||
$currentTick = $next < $timeFrame;
|
|
||||||
|
|
||||||
if (!$currentTick) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$total++;
|
|
||||||
|
|
||||||
$promiseStart = \time(); // in seconds
|
|
||||||
$executionStart = $nextDate->getTimestamp(); // in seconds
|
|
||||||
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
|
|
||||||
|
|
||||||
if (!isset($delayedExecutions[$delay])) {
|
|
||||||
$delayedExecutions[$delay] = [];
|
|
||||||
}
|
|
||||||
|
|
||||||
$delayedExecutions[$delay][] = $key;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
|
||||||
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
|
|
||||||
\sleep($delay); // in seconds
|
|
||||||
|
|
||||||
$queue = $pools->get('queue')->pop();
|
|
||||||
$connection = $queue->getResource();
|
|
||||||
|
|
||||||
foreach ($scheduleKeys as $scheduleKey) {
|
|
||||||
// Ensure schedule was not deleted
|
|
||||||
if (!isset($schedules[$scheduleKey])) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$schedule = $schedules[$scheduleKey];
|
|
||||||
|
|
||||||
$functions = new Func($connection);
|
|
||||||
|
|
||||||
$functions
|
|
||||||
->setType('schedule')
|
|
||||||
->setFunction($schedule['function'])
|
|
||||||
->setMethod('POST')
|
|
||||||
->setPath('/')
|
|
||||||
->setProject($schedule['project'])
|
|
||||||
->trigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
$queue->reclaim();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
$timerEnd = \microtime(true);
|
|
||||||
$lastEnqueueUpdate = $timerStart;
|
|
||||||
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
|
|
||||||
};
|
|
||||||
|
|
||||||
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
|
|
||||||
$enqueueFunctions();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
187
src/Appwrite/Platform/Tasks/ScheduleBase.php
Normal file
187
src/Appwrite/Platform/Tasks/ScheduleBase.php
Normal file
|
@ -0,0 +1,187 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
|
use Cron\CronExpression;
|
||||||
|
use Swoole\Timer;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\Database\Exception;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Pools\Group;
|
||||||
|
use Appwrite\Event\Func;
|
||||||
|
|
||||||
|
use function Swoole\Coroutine\run;
|
||||||
|
|
||||||
|
abstract class ScheduleBase extends Action
|
||||||
|
{
|
||||||
|
protected const UPDATE_TIMER = 10; //seconds
|
||||||
|
protected const ENQUEUE_TIMER = 60; //seconds
|
||||||
|
|
||||||
|
protected array $schedules = [];
|
||||||
|
|
||||||
|
abstract public static function getName(): string;
|
||||||
|
abstract public static function getSupportedResource(): string;
|
||||||
|
|
||||||
|
abstract protected function enqueueResources(
|
||||||
|
Group $pools,
|
||||||
|
Database $dbForConsole
|
||||||
|
);
|
||||||
|
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
$type = static::getSupportedResource();
|
||||||
|
|
||||||
|
$this
|
||||||
|
->desc("Execute {$type}s scheduled in Appwrite")
|
||||||
|
->inject('pools')
|
||||||
|
->inject('dbForConsole')
|
||||||
|
->inject('getProjectDB')
|
||||||
|
->callback(fn(Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 1. Load all documents from 'schedules' collection to create local copy
|
||||||
|
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
||||||
|
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
|
||||||
|
*/
|
||||||
|
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||||
|
{
|
||||||
|
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
|
||||||
|
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract only necessary attributes to lower memory used.
|
||||||
|
*
|
||||||
|
* @return array
|
||||||
|
* @throws Exception
|
||||||
|
* @var Document $schedule
|
||||||
|
*/
|
||||||
|
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
|
||||||
|
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
|
||||||
|
|
||||||
|
$resource = $getProjectDB($project)->getDocument(
|
||||||
|
$schedule->getAttribute('resourceCollection'),
|
||||||
|
$schedule->getAttribute('resourceId')
|
||||||
|
);
|
||||||
|
|
||||||
|
return [
|
||||||
|
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||||
|
'schedule' => $schedule->getAttribute('schedule'),
|
||||||
|
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||||
|
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||||
|
'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
$lastSyncUpdate = DateTime::now();
|
||||||
|
|
||||||
|
$limit = 10_000;
|
||||||
|
$sum = $limit;
|
||||||
|
$total = 0;
|
||||||
|
$loadStart = \microtime(true);
|
||||||
|
$latestDocument = null;
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$paginationQueries = [Query::limit($limit)];
|
||||||
|
|
||||||
|
if ($latestDocument) {
|
||||||
|
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||||
|
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
|
||||||
|
Query::equal('resourceType', [static::getSupportedResource()]),
|
||||||
|
Query::equal('active', [true]),
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sum = \count($results);
|
||||||
|
$total = $total + $sum;
|
||||||
|
|
||||||
|
foreach ($results as $document) {
|
||||||
|
try {
|
||||||
|
$this->schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error("Failed to load schedule for project {$document['projectId']} {$document['resourceCollection']} {$document['resourceId']}");
|
||||||
|
Console::error($th->getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$latestDocument = \end($results);
|
||||||
|
}
|
||||||
|
|
||||||
|
$pools->reclaim();
|
||||||
|
|
||||||
|
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
|
||||||
|
|
||||||
|
Console::success("Starting timers at " . DateTime::now());
|
||||||
|
|
||||||
|
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
|
||||||
|
/**
|
||||||
|
* The timer synchronize $schedules copy with database collection.
|
||||||
|
*/
|
||||||
|
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
|
||||||
|
$time = DateTime::now();
|
||||||
|
$timerStart = \microtime(true);
|
||||||
|
|
||||||
|
$limit = 1000;
|
||||||
|
$sum = $limit;
|
||||||
|
$total = 0;
|
||||||
|
$latestDocument = null;
|
||||||
|
|
||||||
|
Console::log("Sync tick: Running at $time");
|
||||||
|
|
||||||
|
while ($sum === $limit) {
|
||||||
|
$paginationQueries = [Query::limit($limit)];
|
||||||
|
|
||||||
|
if ($latestDocument) {
|
||||||
|
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||||
|
}
|
||||||
|
|
||||||
|
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||||
|
Query::equal('region', [App::getEnv('_APP_REGION', 'default')]),
|
||||||
|
Query::equal('resourceType', [static::getSupportedResource()]),
|
||||||
|
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sum = count($results);
|
||||||
|
$total = $total + $sum;
|
||||||
|
|
||||||
|
foreach ($results as $document) {
|
||||||
|
$localDocument = $schedules[$document['resourceId']] ?? null;
|
||||||
|
|
||||||
|
// Check if resource has been updated since last sync
|
||||||
|
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
|
||||||
|
$new = \strtotime($document['resourceUpdatedAt']);
|
||||||
|
|
||||||
|
if (!$document['active']) {
|
||||||
|
Console::info("Removing: {$document['resourceId']}");
|
||||||
|
unset($this->schedules[$document['resourceId']]);
|
||||||
|
} elseif ($new !== $org) {
|
||||||
|
Console::info("Updating: {$document['resourceId']}");
|
||||||
|
$this->schedules[$document['resourceId']] = $getSchedule($document);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$latestDocument = \end($results);
|
||||||
|
}
|
||||||
|
|
||||||
|
$lastSyncUpdate = $time;
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
|
||||||
|
$pools->reclaim();
|
||||||
|
|
||||||
|
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
|
||||||
|
});
|
||||||
|
|
||||||
|
Timer::tick(static::ENQUEUE_TIMER * 1000, fn() =>
|
||||||
|
$this->enqueueResources($pools, $dbForConsole));
|
||||||
|
|
||||||
|
$this->enqueueResources($pools, $dbForConsole);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
101
src/Appwrite/Platform/Tasks/ScheduleFunctions.php
Normal file
101
src/Appwrite/Platform/Tasks/ScheduleFunctions.php
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
|
use Appwrite\Event\Func;
|
||||||
|
use Cron\CronExpression;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Pools\Group;
|
||||||
|
|
||||||
|
class ScheduleFunctions extends ScheduleBase
|
||||||
|
{
|
||||||
|
public const UPDATE_TIMER = 10; // seconds
|
||||||
|
public const ENQUEUE_TIMER = 60; // seconds
|
||||||
|
|
||||||
|
private ?float $lastEnqueueUpdate = null;
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'schedule-functions';
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getSupportedResource(): string
|
||||||
|
{
|
||||||
|
return 'message';
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function enqueueResources(Group $pools, Database $dbForConsole): void
|
||||||
|
{
|
||||||
|
$timerStart = \microtime(true);
|
||||||
|
$time = DateTime::now();
|
||||||
|
|
||||||
|
$enqueueDiff = $this->lastEnqueueUpdate === null ? 0 : $timerStart - $this->lastEnqueueUpdate;
|
||||||
|
$timeFrame = DateTime::addSeconds(new \DateTime(), static::ENQUEUE_TIMER - $enqueueDiff);
|
||||||
|
|
||||||
|
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
|
||||||
|
|
||||||
|
$total = 0;
|
||||||
|
|
||||||
|
$delayedExecutions = []; // Group executions with same delay to share one coroutine
|
||||||
|
|
||||||
|
foreach ($this->schedules as $key => $schedule) {
|
||||||
|
$cron = new CronExpression($schedule['schedule']);
|
||||||
|
$nextDate = $cron->getNextRunDate();
|
||||||
|
$next = DateTime::format($nextDate);
|
||||||
|
|
||||||
|
$currentTick = $next < $timeFrame;
|
||||||
|
|
||||||
|
if (!$currentTick) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
$total++;
|
||||||
|
|
||||||
|
$promiseStart = \time(); // in seconds
|
||||||
|
$executionStart = $nextDate->getTimestamp(); // in seconds
|
||||||
|
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
|
||||||
|
|
||||||
|
if (!isset($delayedExecutions[$delay])) {
|
||||||
|
$delayedExecutions[$delay] = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
$delayedExecutions[$delay][] = $key;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
||||||
|
\go(function () use ($delay, $scheduleKeys, $pools) {
|
||||||
|
\sleep($delay); // in seconds
|
||||||
|
|
||||||
|
$queue = $pools->get('queue')->pop();
|
||||||
|
$connection = $queue->getResource();
|
||||||
|
|
||||||
|
foreach ($scheduleKeys as $scheduleKey) {
|
||||||
|
// Ensure schedule was not deleted
|
||||||
|
if (!isset($schedules[$scheduleKey])) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$schedule = $schedules[$scheduleKey];
|
||||||
|
|
||||||
|
$queueForFunctions = new Func($connection);
|
||||||
|
|
||||||
|
$queueForFunctions
|
||||||
|
->setType('schedule')
|
||||||
|
->setFunction($schedule['resource'])
|
||||||
|
->setMethod('POST')
|
||||||
|
->setPath('/')
|
||||||
|
->setProject($schedule['project'])
|
||||||
|
->trigger();
|
||||||
|
}
|
||||||
|
|
||||||
|
$queue->reclaim();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$timerEnd = \microtime(true);
|
||||||
|
$this->lastEnqueueUpdate = $timerStart;
|
||||||
|
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,162 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
namespace Appwrite\Platform\Tasks;
|
|
||||||
|
|
||||||
use Appwrite\Event\Delete;
|
|
||||||
use Swoole\Timer;
|
|
||||||
use Utopia\Platform\Action;
|
|
||||||
use Utopia\CLI\Console;
|
|
||||||
use Utopia\Database\DateTime;
|
|
||||||
use Utopia\Database\Query;
|
|
||||||
use Utopia\Database\Database;
|
|
||||||
use Utopia\Pools\Group;
|
|
||||||
use Appwrite\Event\Messaging;
|
|
||||||
|
|
||||||
use function Swoole\Coroutine\run;
|
|
||||||
|
|
||||||
class ScheduleMessage extends Action
|
|
||||||
{
|
|
||||||
public const MESSAGE_UPDATE_TIMER = 10; //seconds
|
|
||||||
public const MESSAGE_ENQUEUE_TIMER = 60; //seconds
|
|
||||||
|
|
||||||
public static function getName(): string
|
|
||||||
{
|
|
||||||
return 'schedule-message';
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __construct()
|
|
||||||
{
|
|
||||||
$this
|
|
||||||
->desc('Execute functions scheduled in Appwrite')
|
|
||||||
->inject('pools')
|
|
||||||
->inject('dbForConsole')
|
|
||||||
->inject('getProjectDB')
|
|
||||||
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 1. Load all documents from 'schedules' collection to create local copy
|
|
||||||
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
|
||||||
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
|
|
||||||
*/
|
|
||||||
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
|
||||||
{
|
|
||||||
Console::title('Scheduler V1');
|
|
||||||
Console::success(APP_NAME . ' Scheduler v1 has started');
|
|
||||||
|
|
||||||
$schedules = []; // Local copy of 'schedules' collection
|
|
||||||
$lastSyncUpdate = DateTime::now();
|
|
||||||
|
|
||||||
$limit = 10000;
|
|
||||||
$sum = $limit;
|
|
||||||
$total = 0;
|
|
||||||
$loadStart = \microtime(true);
|
|
||||||
$latestDocument = null;
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())),
|
|
||||||
Query::equal('resourceType', ['message']),
|
|
||||||
Query::equal('active', [true]),
|
|
||||||
]));
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
var_dump($e->getTraceAsString());
|
|
||||||
}
|
|
||||||
|
|
||||||
$sum = count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $schedule) {
|
|
||||||
$schedules[$schedule->getId()] = $schedule;
|
|
||||||
}
|
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$pools->reclaim();
|
|
||||||
|
|
||||||
Console::success("{$total} message were loaded in " . (microtime(true) - $loadStart) . " seconds");
|
|
||||||
|
|
||||||
Console::success("Starting timers at " . DateTime::now());
|
|
||||||
|
|
||||||
run(
|
|
||||||
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) {
|
|
||||||
/**
|
|
||||||
* The timer synchronize $schedules copy with database collection.
|
|
||||||
*/
|
|
||||||
Timer::tick(self::MESSAGE_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $pools) {
|
|
||||||
$time = DateTime::now();
|
|
||||||
$timerStart = \microtime(true);
|
|
||||||
|
|
||||||
$limit = 1000;
|
|
||||||
$sum = $limit;
|
|
||||||
$total = 0;
|
|
||||||
$latestDocument = null;
|
|
||||||
|
|
||||||
Console::log("Sync tick: Running at $time");
|
|
||||||
|
|
||||||
while ($sum === $limit) {
|
|
||||||
$paginationQueries = [Query::limit($limit)];
|
|
||||||
if ($latestDocument !== null) {
|
|
||||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
|
||||||
}
|
|
||||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
|
||||||
Query::lessThanEqual('schedule', DateTime::formatTz(DateTime::now())),
|
|
||||||
Query::equal('resourceType', ['message']),
|
|
||||||
Query::equal('active', [true]),
|
|
||||||
]));
|
|
||||||
$sum = \count($results);
|
|
||||||
$total = $total + $sum;
|
|
||||||
foreach ($results as $schedule) {
|
|
||||||
$schedules[$schedule->getId()] = $schedule;
|
|
||||||
}
|
|
||||||
|
|
||||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$lastSyncUpdate = $time;
|
|
||||||
$timerEnd = \microtime(true);
|
|
||||||
|
|
||||||
$pools->reclaim();
|
|
||||||
|
|
||||||
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The timer to prepare soon-to-execute schedules.
|
|
||||||
*/
|
|
||||||
$enqueueMessages = function () use (&$schedules, $pools, $dbForConsole) {
|
|
||||||
foreach ($schedules as $scheduleId => $schedule) {
|
|
||||||
\go(function () use (&$schedules, $schedule, $pools, $dbForConsole) {
|
|
||||||
$queue = $pools->get('queue')->pop();
|
|
||||||
$connection = $queue->getResource();
|
|
||||||
$queueForMessaging = new Messaging($connection);
|
|
||||||
$queueForDeletes = new Delete($connection);
|
|
||||||
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
|
|
||||||
$queueForMessaging
|
|
||||||
->setMessageId($schedule->getAttribute('resourceId'))
|
|
||||||
->setProject($project)
|
|
||||||
->trigger();
|
|
||||||
$schedule->setAttribute('active', false);
|
|
||||||
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
|
|
||||||
|
|
||||||
$queueForDeletes
|
|
||||||
->setType(DELETE_TYPE_SCHEDULES)
|
|
||||||
->setDocument($schedule);
|
|
||||||
|
|
||||||
$queue->reclaim();
|
|
||||||
unset($schedules[$schedule->getId()]);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Timer::tick(self::MESSAGE_ENQUEUE_TIMER * 1000, fn () => $enqueueMessages());
|
|
||||||
$enqueueMessages();
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
57
src/Appwrite/Platform/Tasks/ScheduleMessages.php
Normal file
57
src/Appwrite/Platform/Tasks/ScheduleMessages.php
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Tasks;
|
||||||
|
|
||||||
|
use Appwrite\Event\Delete;
|
||||||
|
use Swoole\Timer;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Database\Query;
|
||||||
|
use Utopia\Database\Database;
|
||||||
|
use Utopia\Pools\Group;
|
||||||
|
use Appwrite\Event\Messaging;
|
||||||
|
|
||||||
|
use function Swoole\Coroutine\run;
|
||||||
|
|
||||||
|
class ScheduleMessages extends ScheduleBase
|
||||||
|
{
|
||||||
|
public const UPDATE_TIMER = 3; // seconds
|
||||||
|
public const ENQUEUE_TIMER = 60; // seconds
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'schedule-messages';
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function getSupportedResource(): string
|
||||||
|
{
|
||||||
|
return 'message';
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function enqueueResources(Group $pools, Database $dbForConsole): void
|
||||||
|
{
|
||||||
|
foreach ($this->schedules as $schedule) {
|
||||||
|
\go(function () use ($schedule, $pools, $dbForConsole) {
|
||||||
|
$queue = $pools->get('queue')->pop();
|
||||||
|
$connection = $queue->getResource();
|
||||||
|
$queueForMessaging = new Messaging($connection);
|
||||||
|
$queueForDeletes = new Delete($connection);
|
||||||
|
|
||||||
|
$queueForMessaging
|
||||||
|
->setMessageId($schedule['resourceId'])
|
||||||
|
->setProject($schedule['project'])
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
$queueForDeletes
|
||||||
|
->setType(DELETE_TYPE_SCHEDULES)
|
||||||
|
->setDocument($schedule)
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
$queue->reclaim();
|
||||||
|
|
||||||
|
unset($this->schedules[$schedule->getId()]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue