infinity worker
This commit is contained in:
parent
4dc2b57502
commit
15aeea7a44
14 changed files with 349 additions and 5 deletions
|
@ -63,6 +63,7 @@ RUN chmod +x /usr/local/bin/doctor && \
|
|||
chmod +x /usr/local/bin/schedule-functions && \
|
||||
chmod +x /usr/local/bin/schedule-executions && \
|
||||
chmod +x /usr/local/bin/schedule-messages && \
|
||||
chmod +x /usr/local/bin/infinity-scheduler && \
|
||||
chmod +x /usr/local/bin/sdks && \
|
||||
chmod +x /usr/local/bin/specs && \
|
||||
chmod +x /usr/local/bin/ssl && \
|
||||
|
@ -84,7 +85,8 @@ RUN chmod +x /usr/local/bin/doctor && \
|
|||
chmod +x /usr/local/bin/worker-migrations && \
|
||||
chmod +x /usr/local/bin/worker-webhooks && \
|
||||
chmod +x /usr/local/bin/worker-usage && \
|
||||
chmod +x /usr/local/bin/worker-usage-dump
|
||||
chmod +x /usr/local/bin/worker-usage-dump && \
|
||||
chmod +x /usr/local/bin/worker-usage-infinity
|
||||
|
||||
# Letsencrypt Permissions
|
||||
RUN mkdir -p /etc/letsencrypt/live/ && chmod -Rf 755 /etc/letsencrypt/live/
|
||||
|
|
|
@ -6,6 +6,8 @@ require_once __DIR__ . '/controllers/general.php';
|
|||
use Appwrite\Event\Certificate;
|
||||
use Appwrite\Event\Delete;
|
||||
use Appwrite\Event\Func;
|
||||
use Appwrite\Event\UsageDump;
|
||||
use Appwrite\Event\UsageInfinity;
|
||||
use Appwrite\Platform\Appwrite;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
|
@ -20,6 +22,7 @@ use Utopia\Logger\Log;
|
|||
use Utopia\Platform\Service;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Queue\Connection;
|
||||
use Utopia\Queue\Server;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\System\System;
|
||||
|
||||
|
@ -165,6 +168,9 @@ CLI::setResource('queueForDeletes', function (Connection $queue) {
|
|||
CLI::setResource('queueForCertificates', function (Connection $queue) {
|
||||
return new Certificate($queue);
|
||||
}, ['queue']);
|
||||
CLI::setResource('queueForUsageInfinity', function (Connection $queue) {
|
||||
return new UsageInfinity($queue);
|
||||
}, ['queue']);
|
||||
CLI::setResource('logError', function (Registry $register) {
|
||||
return function (Throwable $error, string $namespace, string $action) use ($register) {
|
||||
$logger = $register->get('logger');
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -14,11 +14,13 @@ use Appwrite\Event\Messaging;
|
|||
use Appwrite\Event\Migration;
|
||||
use Appwrite\Event\Usage;
|
||||
use Appwrite\Event\UsageDump;
|
||||
use Appwrite\Event\UsageInfinity;
|
||||
use Appwrite\Platform\Appwrite;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\CLI;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Database;
|
||||
|
@ -204,6 +206,10 @@ Server::setResource('queueForUsageDump', function (Connection $queue) {
|
|||
return new UsageDump($queue);
|
||||
}, ['queue']);
|
||||
|
||||
Server::setResource('queueForUsageInfinity', function (Connection $queue) {
|
||||
return new UsageInfinity($queue);
|
||||
}, ['queue']);
|
||||
|
||||
Server::setResource('queue', function (Group $pools) {
|
||||
return $pools->get('queue')->pop()->getResource();
|
||||
}, ['pools']);
|
||||
|
|
3
bin/infinity-scheduler
Normal file
3
bin/infinity-scheduler
Normal file
|
@ -0,0 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/cli.php infinity-usage-scheduler $@
|
3
bin/worker-usage-infinity
Normal file
3
bin/worker-usage-infinity
Normal file
|
@ -0,0 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/worker.php usage-infinity $@
|
|
@ -715,6 +715,38 @@ services:
|
|||
- _APP_MAINTENANCE_DELAY
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
|
||||
appwrite-infinity-usage-scheduler:
|
||||
entrypoint: infinity-scheduler
|
||||
<<: *x-logging
|
||||
container_name: appwrite-infinity-usage-scheduler
|
||||
image: appwrite-dev
|
||||
networks:
|
||||
- appwrite
|
||||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
depends_on:
|
||||
- redis
|
||||
- mariadb
|
||||
environment:
|
||||
- _APP_ENV
|
||||
- _APP_WORKER_PER_CORE
|
||||
- _APP_DOMAIN
|
||||
- _APP_DOMAIN_TARGET
|
||||
- _APP_DOMAIN_FUNCTIONS
|
||||
- _APP_OPENSSL_KEY_V1
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_DB_HOST
|
||||
- _APP_DB_PORT
|
||||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_USAGE_INF_TIME
|
||||
- _APP_USAGE_INF_INTERVAL
|
||||
|
||||
appwrite-worker-usage:
|
||||
entrypoint: worker-usage
|
||||
<<: *x-logging
|
||||
|
@ -777,6 +809,37 @@ services:
|
|||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
|
||||
appwrite-worker-usage-infinity:
|
||||
entrypoint: worker-usage-infinity
|
||||
<<: *x-logging
|
||||
container_name: appwrite-worker-usage-infinity
|
||||
image: appwrite-dev
|
||||
networks:
|
||||
- appwrite
|
||||
volumes:
|
||||
- ./app:/usr/src/code/app
|
||||
- ./src:/usr/src/code/src
|
||||
depends_on:
|
||||
- redis
|
||||
- mariadb
|
||||
environment:
|
||||
- _APP_ENV
|
||||
- _APP_WORKER_PER_CORE
|
||||
- _APP_OPENSSL_KEY_V1
|
||||
- _APP_DB_HOST
|
||||
- _APP_DB_PORT
|
||||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_REDIS_HOST
|
||||
- _APP_REDIS_PORT
|
||||
- _APP_REDIS_USER
|
||||
- _APP_REDIS_PASS
|
||||
- _APP_USAGE_STATS
|
||||
- _APP_LOGGING_CONFIG
|
||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||
- _APP_DATABASE_SHARED_TABLES
|
||||
|
||||
appwrite-task-scheduler-functions:
|
||||
entrypoint: schedule-functions
|
||||
<<: *x-logging
|
||||
|
|
|
@ -30,6 +30,9 @@ class Event
|
|||
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
|
||||
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
|
||||
|
||||
public const USAGE_INFINITY_QUEUE_NAME = 'v1-usage-infinity';
|
||||
public const USAGE_INFINITY_CLASS_NAME = 'UsageInfinityV1';
|
||||
|
||||
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
|
||||
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';
|
||||
|
||||
|
|
34
src/Appwrite/Event/UsageInfinity.php
Normal file
34
src/Appwrite/Event/UsageInfinity.php
Normal file
|
@ -0,0 +1,34 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Event;
|
||||
|
||||
use Utopia\Queue\Client;
|
||||
use Utopia\Queue\Connection;
|
||||
|
||||
class UsageInfinity extends Event
|
||||
{
|
||||
|
||||
public function __construct(protected Connection $connection)
|
||||
{
|
||||
parent::__construct($connection);
|
||||
|
||||
$this
|
||||
->setQueue(Event::USAGE_INFINITY_QUEUE_NAME)
|
||||
->setClass(Event::USAGE_INFINITY_CLASS_NAME);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends project to the usage infinity worker.
|
||||
*
|
||||
* @return string|bool
|
||||
*/
|
||||
public function trigger(): string|bool
|
||||
{
|
||||
var_dump($this->getProject());
|
||||
$client = new Client($this->queue, $this->connection);
|
||||
return $client->enqueue([
|
||||
'project' => $this->getProject(),
|
||||
]);
|
||||
}
|
||||
}
|
|
@ -17,6 +17,7 @@ use Appwrite\Platform\Tasks\SSL;
|
|||
use Appwrite\Platform\Tasks\Upgrade;
|
||||
use Appwrite\Platform\Tasks\Vars;
|
||||
use Appwrite\Platform\Tasks\Version;
|
||||
use Appwrite\Platform\Tasks\InfinityUsageScheduler;
|
||||
use Utopia\Platform\Service;
|
||||
|
||||
class Tasks extends Service
|
||||
|
@ -40,6 +41,7 @@ class Tasks extends Service
|
|||
->addAction(Upgrade::getName(), new Upgrade())
|
||||
->addAction(Vars::getName(), new Vars())
|
||||
->addAction(Version::getName(), new Version())
|
||||
->addAction(InfinityUsageScheduler::getName(), new InfinityUsageScheduler())
|
||||
;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,9 +11,10 @@ use Appwrite\Platform\Workers\Functions;
|
|||
use Appwrite\Platform\Workers\Mails;
|
||||
use Appwrite\Platform\Workers\Messaging;
|
||||
use Appwrite\Platform\Workers\Migrations;
|
||||
use Appwrite\Platform\Workers\Webhooks;
|
||||
use Appwrite\Platform\Workers\Usage;
|
||||
use Appwrite\Platform\Workers\UsageDump;
|
||||
use Appwrite\Platform\Workers\Webhooks;
|
||||
use Appwrite\Platform\Workers\UsageInfinity;
|
||||
use Utopia\Platform\Service;
|
||||
|
||||
class Workers extends Service
|
||||
|
@ -34,7 +35,7 @@ class Workers extends Service
|
|||
->addAction(UsageDump::getName(), new UsageDump())
|
||||
->addAction(Usage::getName(), new Usage())
|
||||
->addAction(Migrations::getName(), new Migrations())
|
||||
|
||||
->addAction(UsageInfinity::getName(), new UsageInfinity())
|
||||
;
|
||||
}
|
||||
}
|
||||
|
|
108
src/Appwrite/Platform/Tasks/infinityUsageScheduler.php
Normal file
108
src/Appwrite/Platform/Tasks/infinityUsageScheduler.php
Normal file
|
@ -0,0 +1,108 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Tasks;
|
||||
|
||||
use Appwrite\Event\UsageInfinity;
|
||||
use Exception;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Platform\Action;
|
||||
|
||||
class InfinityUsageScheduler extends Action
|
||||
{
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'infinity-usage-scheduler';
|
||||
}
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Get infinity stats for projects')
|
||||
->inject('dbForConsole')
|
||||
->inject('queueForUsageInfinity')
|
||||
->callback(fn (Database $dbForConsole, UsageInfinity $queueForUsageInfinity) => $this->action($dbForConsole, $queueForUsageInfinity));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Database $dbForConsole
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Database $dbForConsole, UsageInfinity $queueForUsageInfinity): void
|
||||
{
|
||||
Console::title('Infinity stats scheduler V1');
|
||||
Console::success(APP_NAME . ' Infinity stats scheduler process has started');
|
||||
|
||||
$sleep = (int)App::getEnv('_APP_USAGE_INF_INTERVAL', '30'); // 30 seconds (by default)
|
||||
|
||||
$jobInitTime = App::getEnv('_APP_USAGE_INF_TIME', '24:00'); // (hour:minutes)
|
||||
|
||||
$now = new \DateTime();
|
||||
$now->setTimezone(new \DateTimeZone(date_default_timezone_get()));
|
||||
|
||||
$next = new \DateTime($now->format("Y-m-d $jobInitTime"));
|
||||
$next->setTimezone(new \DateTimeZone(date_default_timezone_get()));
|
||||
|
||||
$delay = $next->getTimestamp() - $now->getTimestamp();
|
||||
|
||||
if ($delay <= 0) {
|
||||
$next->add(\DateInterval::createFromDateString('1 days'));
|
||||
$delay = $next->getTimestamp() - $now->getTimestamp();
|
||||
}
|
||||
|
||||
Console::log('[' . $now->format("Y-m-d H:i:s.v") . '] Delaying for ' . $delay . ' setting loop to [' . $next->format("Y-m-d H:i:s.v") . ']');
|
||||
|
||||
$sleep = 30;
|
||||
$delay = 0;
|
||||
|
||||
Console::loop(function () use ($dbForConsole, $queueForUsageInfinity, $sleep) {
|
||||
$now = date('d-m-Y H:i:s', time());
|
||||
Console::info("[{$now}] Queuing Cloud Usage Stats every {$sleep} seconds");
|
||||
$loopStart = microtime(true);
|
||||
|
||||
$count = 0;
|
||||
$chunk = 0;
|
||||
$limit = 50;
|
||||
$results = [];
|
||||
$sum = $limit;
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
|
||||
$results = $dbForConsole->find('projects', \array_merge([
|
||||
Query::limit($limit),
|
||||
Query::offset($count)
|
||||
]));
|
||||
|
||||
$sum = count($results);
|
||||
|
||||
Console::log('Processing chunk #' . $chunk . '. Found ' . $sum . ' documents');
|
||||
|
||||
foreach ($results as $document) {
|
||||
|
||||
$queueForUsageInfinity->setProject($document);
|
||||
var_dump('Pushing project '.$document->getInternalId());
|
||||
$count++;
|
||||
}
|
||||
}
|
||||
|
||||
$executionEnd = \microtime(true);
|
||||
|
||||
Console::log("Processed {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
|
||||
|
||||
|
||||
$loopTook = microtime(true) - $loopStart;
|
||||
$now = date('d-m-Y H:i:s', time());
|
||||
Console::info("[{$now}] usage Stats took {$loopTook} seconds");
|
||||
}, $sleep, $delay);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
113
src/Appwrite/Platform/Workers/UsageInfinity.php
Normal file
113
src/Appwrite/Platform/Workers/UsageInfinity.php
Normal file
|
@ -0,0 +1,113 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Event\Mail;
|
||||
use Appwrite\Template\Template;
|
||||
use Exception;
|
||||
use phpDocumentor\Reflection\Types\Callable_;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Duplicate;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\System\System;
|
||||
|
||||
class UsageInfinity extends Action
|
||||
{
|
||||
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'usage-infinity';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Usage infinity stats worker')
|
||||
->inject('message')
|
||||
->inject('getProjectDB')
|
||||
->callback(fn (Message $message, Callable $getProjectDB) => $this->action($message, $getProjectDB));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForConsole
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Callable $getProjectDB): void
|
||||
{
|
||||
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
var_dump($payload);
|
||||
|
||||
$project = new Document($payload['project']);
|
||||
$dbForProject = call_user_func($getProjectDB, $project);
|
||||
|
||||
try {
|
||||
$this->storage($dbForProject);
|
||||
Console::log('Finished project ' . $project->getId() . ' ' . $project->getInternalId());
|
||||
|
||||
} catch (\Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private function createInfMetric(database $dbForProject, string $metric, int|float $value): void
|
||||
{
|
||||
var_dump($metric);
|
||||
try {
|
||||
$id = \md5("_inf_{$metric}");
|
||||
$dbForProject->deleteDocument('stats', $id);
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'metric' => $metric,
|
||||
'period' => 'inf',
|
||||
'value' => (int)$value,
|
||||
'time' => null,
|
||||
'region' => 'default',
|
||||
]));
|
||||
} catch (\Throwable $th) {
|
||||
console::log("Error while creating inf metric: {$metric} {$id} " . $th->getMessage());
|
||||
}
|
||||
}
|
||||
private function storage(database $dbForProject)
|
||||
{
|
||||
$bucketsCount = 0;
|
||||
$filesCount = 0;
|
||||
$filesStorageSum = 0;
|
||||
|
||||
$buckets = $dbForProject->find('buckets');
|
||||
foreach ($buckets as $bucket) {
|
||||
$files = $dbForProject->count('bucket_' . $bucket->getInternalId());
|
||||
$this->createInfMetric($dbForProject, $bucket->getInternalId() . '.files', $files);
|
||||
|
||||
$filesStorage = $dbForProject->sum('bucket_' . $bucket->getInternalId(), 'sizeOriginal');
|
||||
$this->createInfMetric($dbForProject, $bucket->getInternalId() . '.files.storage', $filesStorage);
|
||||
|
||||
$bucketsCount++;
|
||||
$filesCount += $files;
|
||||
$filesStorageSum += $filesStorage;
|
||||
}
|
||||
|
||||
$this->createInfMetric($dbForProject, 'buckets', $bucketsCount);
|
||||
$this->createInfMetric($dbForProject, 'files', $filesCount);
|
||||
$this->createInfMetric($dbForProject, 'files.storage', $filesStorageSum);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue