replacing offset with pagination
This commit is contained in:
parent
4409217521
commit
3768912412
1 changed files with 41 additions and 24 deletions
|
@ -69,62 +69,70 @@ $cli
|
|||
};
|
||||
|
||||
$dbForConsole = getConsoleDB();
|
||||
$limit = 200;
|
||||
$limit = 10000;
|
||||
$sum = $limit;
|
||||
$functions = [];
|
||||
$queue = [];
|
||||
$count = 0;
|
||||
$loadStart = \microtime(true);
|
||||
$total = 0;
|
||||
/**
|
||||
* Initial run fill $functions list
|
||||
*/
|
||||
$loadStart = \microtime(true);
|
||||
$latestDocument = null;
|
||||
|
||||
while ($sum === $limit) {
|
||||
$results = $dbForConsole->find('schedules', [
|
||||
$paginationQueries = [Query::limit($limit)];
|
||||
if ($latestDocument !== null) {
|
||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||
}
|
||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('resourceType', ['function']),
|
||||
Query::equal('active', [true]),
|
||||
Query::offset($count * $limit),
|
||||
Query::limit($limit),
|
||||
]);
|
||||
]));
|
||||
|
||||
$sum = count($results);
|
||||
$total = $total + $sum;
|
||||
foreach ($results as $document) {
|
||||
$functions[$document['resourceId']] = $document;
|
||||
$functions[$document['resourceId']] = [
|
||||
'resourceId' => $document->getAttribute('resourceId'),
|
||||
'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'),
|
||||
'schedule' => $document->getAttribute('schedule'),
|
||||
];
|
||||
}
|
||||
$count++;
|
||||
|
||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||
}
|
||||
|
||||
$loadEnd = \microtime(true);
|
||||
Console::error("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds");
|
||||
|
||||
$createQueue();
|
||||
|
||||
$lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_VALIDATION_TIMER);
|
||||
|
||||
/**
|
||||
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
|
||||
*/
|
||||
Co\run(
|
||||
function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
|
||||
$time = DateTime::now();
|
||||
$count = 0;
|
||||
$limit = 200;
|
||||
$sum = $limit;
|
||||
$total = 0;
|
||||
$latestDocument = null;
|
||||
$timerStart = \microtime(true);
|
||||
|
||||
Console::info("Update proc run at: $time last update was at $lastUpdate");
|
||||
/**
|
||||
* Updating functions list from DB.
|
||||
*/
|
||||
while (!empty($sum)) {
|
||||
$results = $dbForConsole->find('schedules', [
|
||||
|
||||
while ($sum === $limit) {
|
||||
$paginationQueries = [Query::limit($limit)];
|
||||
if ($latestDocument !== null) {
|
||||
$paginationQueries[] = Query::cursorAfter($latestDocument);
|
||||
}
|
||||
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
|
||||
Query::equal('region', [App::getEnv('_APP_REGION')]),
|
||||
Query::equal('resourceType', ['function']),
|
||||
Query::greaterThan('resourceUpdatedAt', $lastUpdate),
|
||||
Query::limit($limit),
|
||||
Query::offset($count * $limit),
|
||||
]);
|
||||
]));
|
||||
|
||||
$sum = count($results);
|
||||
$total = $total + $sum;
|
||||
foreach ($results as $document) {
|
||||
|
@ -135,12 +143,18 @@ $cli
|
|||
unset($functions[$document['resourceId']]);
|
||||
} elseif ($new > $org) {
|
||||
Console::error("Updating: {$document['resourceId']}");
|
||||
$functions[$document['resourceId']] = $document;
|
||||
$functions[$document['resourceId']] = [
|
||||
'resourceId' => $document->getAttribute('resourceId'),
|
||||
'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'),
|
||||
'schedule' => $document->getAttribute('schedule'),
|
||||
];
|
||||
}
|
||||
$removeFromQueue($document['resourceId']);
|
||||
}
|
||||
$count++;
|
||||
|
||||
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
|
||||
}
|
||||
|
||||
$lastUpdate = DateTime::now();
|
||||
$createQueue();
|
||||
$timerEnd = \microtime(true);
|
||||
|
@ -148,6 +162,9 @@ $cli
|
|||
Console::error("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds");
|
||||
});
|
||||
|
||||
/**
|
||||
* The timer sends to worker every 1 min and re-enqueue matched functions.
|
||||
*/
|
||||
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
|
||||
$timerStart = \microtime(true);
|
||||
$time = DateTime::now();
|
||||
|
|
Loading…
Reference in a new issue