minor changes
This commit is contained in:
parent
250ea93d3f
commit
83e19a37ee
|
@ -14,7 +14,6 @@ use Swoole\Timer;
|
|||
const FUNCTION_UPDATE_TIMER = 60; //seconds
|
||||
const FUNCTION_ENQUEUE_TIMER = 60; //seconds
|
||||
const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min
|
||||
const FUNCTION_RESET_TIMER_TO = 50; // seconds
|
||||
|
||||
sleep(4);
|
||||
|
||||
|
@ -32,7 +31,6 @@ sleep(4);
|
|||
* In the end of the timer the $queue is created again.
|
||||
*
|
||||
*/
|
||||
|
||||
$cli
|
||||
->task('schedule')
|
||||
->desc('Function scheduler task')
|
||||
|
@ -59,9 +57,8 @@ $cli
|
|||
$queue[$next][$function['resourceId']] = $function;
|
||||
}
|
||||
}
|
||||
$loadEnd = \microtime(true);
|
||||
Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds");
|
||||
//var_dump($queue);
|
||||
|
||||
Console::success("Queue was built in " . (microtime(true) - $loadStart) . " seconds");
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -69,10 +66,9 @@ $cli
|
|||
* @param string $resourceId
|
||||
* @return void
|
||||
*/
|
||||
$removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) {
|
||||
$removeFromQueue = function (string $resourceId) use (&$queue, &$functions, $dbForConsole) {
|
||||
if (array_key_exists($resourceId, $functions)) {
|
||||
unset($functions[$resourceId]);
|
||||
$dbForConsole->deleteDocument('schedules', $id);
|
||||
Console::error("Removing :{$resourceId} from functions list");
|
||||
}
|
||||
|
||||
|
@ -106,14 +102,17 @@ $cli
|
|||
* @var Document $schedule
|
||||
* @return array
|
||||
*/
|
||||
function getsSheduleAttributes(Document $schedule): array
|
||||
{
|
||||
$getSchedule = function (Document $schedule) use ($dbForConsole): array {
|
||||
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('schedule'));
|
||||
|
||||
return [
|
||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||
'schedule' => $schedule->getAttribute('schedule'),
|
||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||
'project' => $project,
|
||||
//'function' => getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'))
|
||||
];
|
||||
}
|
||||
};
|
||||
|
||||
$limit = 10000;
|
||||
$sum = $limit;
|
||||
|
@ -137,31 +136,19 @@ $cli
|
|||
$sum = count($results);
|
||||
$total = $total + $sum;
|
||||
foreach ($results as $document) {
|
||||
$functions[$document['resourceId']] = getsSheduleAttributes($document);
|
||||
$functions[$document['resourceId']] = $getSchedule($document);
|
||||
}
|
||||
|
||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||
}
|
||||
|
||||
$loadEnd = \microtime(true);
|
||||
Console::success("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds");
|
||||
Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds");
|
||||
$createQueue();
|
||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER);
|
||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -600); // 10 min
|
||||
|
||||
do {
|
||||
$second = time() % 60;
|
||||
} while ($second < FUNCTION_RESET_TIMER_TO);
|
||||
|
||||
$time = DateTime::now();
|
||||
Console::success("Starting timers at {$time}");
|
||||
|
||||
|
||||
/**
|
||||
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
|
||||
*/
|
||||
Co\run(
|
||||
function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($getSchedule, $updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
$time = DateTime::now();
|
||||
$limit = 1000;
|
||||
$sum = $limit;
|
||||
|
@ -169,7 +156,7 @@ $cli
|
|||
$latestDocument = null;
|
||||
$timerStart = \microtime(true);
|
||||
|
||||
//Console::warning("Update proc started at: $time last update was at $lastUpdate");
|
||||
Console::warning("Update proc started at: $time last update was at $lastUpdate");
|
||||
|
||||
while ($sum === $limit) {
|
||||
$paginationQueries = [Query::limit($limit)];
|
||||
|
@ -188,47 +175,29 @@ $cli
|
|||
$org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null;
|
||||
$new = strtotime($document['resourceUpdatedAt']);
|
||||
if ($document['active'] === false) {
|
||||
//Console::warning("Removing: {$document['resourceId']}");
|
||||
$removeFromQueue($document->getId(), $document['resourceId']);
|
||||
$removeFromQueue($document['resourceId']);
|
||||
} elseif ($new > $org) {
|
||||
//Console::warning("Updating: {$document['resourceId']}");
|
||||
$updateQueue($document['resourceId'], getsSheduleAttributes($document));
|
||||
$updateQueue($document['resourceId'], $getSchedule($document));
|
||||
}
|
||||
}
|
||||
|
||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||
}
|
||||
|
||||
$lastUpdate = DateTime::now();
|
||||
$createQueue();
|
||||
$timerEnd = \microtime(true);
|
||||
|
||||
//Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds");
|
||||
Console::warning("Update timer: {$total} functions where updated in " . (microtime(true) - $timerStart) . " seconds");
|
||||
});
|
||||
|
||||
/**
|
||||
* The timer sends to worker every 1 min and re-enqueue matched functions.
|
||||
*/
|
||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
||||
$timerStart = \microtime(true);
|
||||
$time = DateTime::now();
|
||||
$timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME);
|
||||
$slot = (new \DateTime())->format('Y-m-d H:i:00.000');
|
||||
$prepareStart = time();
|
||||
|
||||
Console::info("Enqueue proc started at: $time");
|
||||
Console::info("Enqueue proc started at: " . DateTime::now());
|
||||
|
||||
$count = 0;
|
||||
if (array_key_exists($slot, $queue)) {
|
||||
$schedule = $queue[$slot];
|
||||
console::info(count($schedule) . " functions sent to worker for time slot " . $slot);
|
||||
$totalPreparation = time() - $prepareStart;
|
||||
|
||||
$wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation);
|
||||
Console::info("Waiting for : {$wait} seconds");
|
||||
sleep($wait);
|
||||
|
||||
$time = DateTime::now();
|
||||
Console::info("Start enqueueing at {$time}");
|
||||
|
||||
foreach ($schedule as $function) {
|
||||
if (empty($functions[$function['resourceId']])) {
|
||||
|
@ -249,11 +218,13 @@ $cli
|
|||
$queue[$next][$function['resourceId']] = $function;
|
||||
}
|
||||
unset($queue[$slot][$function['resourceId']]); /** removing function from slot */
|
||||
$count++;
|
||||
}
|
||||
unset($queue[$slot]); /** removing slot */
|
||||
}
|
||||
|
||||
$timerEnd = \microtime(true);
|
||||
Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds");
|
||||
Console::info("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds with {$count} functions");
|
||||
});
|
||||
}
|
||||
);
|
||||
|
|
|
@ -764,7 +764,7 @@ services:
|
|||
|
||||
mariadb:
|
||||
image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p
|
||||
container_name: mariadb
|
||||
container_name: appwrite-mariadb
|
||||
<<: *x-logging
|
||||
networks:
|
||||
- appwrite
|
||||
|
|
Loading…
Reference in a new issue