diff --git a/Dockerfile b/Dockerfile index 324a3a548..4088d7c8f 100755 --- a/Dockerfile +++ b/Dockerfile @@ -95,7 +95,8 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/worker-webhooks && \ chmod +x /usr/local/bin/worker-migrations && \ chmod +x /usr/local/bin/worker-hamster && \ - chmod +x /usr/local/bin/worker-usage + chmod +x /usr/local/bin/worker-usage && \ + chmod +x /usr/local/bin/worker-usage-dump # Cloud Executabless diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 38d65fba7..f291213cb 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -9,6 +9,7 @@ use Appwrite\Event\Event; use Appwrite\Network\Validator\Email; use Appwrite\Utopia\Database\Validator\CustomId; use Appwrite\Utopia\Database\Validator\Queries\Identities; +use Utopia\CLI\Console; use Utopia\Database\Validator\Queries; use Appwrite\Utopia\Database\Validator\Queries\Users; use Utopia\Database\Validator\Query\Limit; @@ -141,6 +142,8 @@ App::post('/v1/users') ->inject('hooks') ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks); + //Todo debug (to be removed laster @shimon) + //Console::log('@create user=' . time() . '=' . $user->getId()); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -1191,6 +1194,9 @@ App::delete('/v1/users/:userId') $dbForProject->deleteDocument('users', $userId); + //Todo debug (to be removed laster @shimon) + //Console::log('@delete user=' . $userId . '=' . time() . '=' . $user->getId()); + $queueForDeletes ->setType(DELETE_TYPE_DOCUMENT) ->setDocument($clone); diff --git a/app/worker.php b/app/worker.php index 1b1f4b9f9..4cf0edbae 100644 --- a/app/worker.php +++ b/app/worker.php @@ -14,6 +14,7 @@ use Appwrite\Event\Mail; use Appwrite\Event\Migration; use Appwrite\Event\Phone; use Appwrite\Event\Usage; +use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\App; @@ -146,6 +147,9 @@ Server::setResource('log', fn() => new Log()); Server::setResource('queueForUsage', function (Connection $queue) { return new Usage($queue); }, ['queue']); +Server::setResource('queueForUsageDump', function (Connection $queue) { + return new UsageDump($queue); +}, ['queue']); Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); @@ -299,12 +303,9 @@ $worker Console::error('[Error] Line: ' . $error->getLine()); }); -try { - $workerStart = $worker->getWorkerStart(); -} catch (\Throwable $error) { - $worker->workerStart(); -} finally { - Console::info("Worker $workerName started"); -} +$worker->workerStart() + ->action(function () use ($workerName) { + Console::info("Worker $workerName started"); + }); $worker->start(); diff --git a/bin/worker-usage-dump b/bin/worker-usage-dump new file mode 100644 index 000000000..43ca87fcb --- /dev/null +++ b/bin/worker-usage-dump @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php usage-dump $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5c645e3bc..2133de0dc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -676,6 +676,38 @@ services: - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL + appwrite-worker-usage-dump: + entrypoint: worker-usage-dump + <<: *x-logging + container_name: appwrite-worker-usage-dump + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_PROVIDER + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + + appwrite-schedule: entrypoint: schedule <<: *x-logging diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index fc12c5b5b..9f71ef5eb 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -27,6 +27,9 @@ class Event public const USAGE_QUEUE_NAME = 'v1-usage'; public const USAGE_CLASS_NAME = 'UsageV1'; + public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump'; + public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1'; + public const WEBHOOK_QUEUE_NAME = 'v1-webhooks'; public const WEBHOOK_CLASS_NAME = 'WebhooksV1'; diff --git a/src/Appwrite/Event/Usage.php b/src/Appwrite/Event/Usage.php index 398c3319f..4f53c7df1 100644 --- a/src/Appwrite/Event/Usage.php +++ b/src/Appwrite/Event/Usage.php @@ -2,6 +2,7 @@ namespace Appwrite\Event; +use Utopia\CLI\Console; use Utopia\Queue\Client; use Utopia\Queue\Connection; use Utopia\Database\Document; @@ -42,6 +43,14 @@ class Usage extends Event */ public function addMetric(string $key, int $value): self { + //Todo debug (to be removed laster @shimon) +// if ($key === 'users') { +// if ($value < 0) { +// console::log('@negative=' . $value); +// } else { +// console::log('@positive=' . $value); +// } +// } $this->metrics[] = [ 'key' => $key, 'value' => $value, diff --git a/src/Appwrite/Event/UsageDump.php b/src/Appwrite/Event/UsageDump.php new file mode 100644 index 000000000..8f8790884 --- /dev/null +++ b/src/Appwrite/Event/UsageDump.php @@ -0,0 +1,47 @@ +setQueue(Event::USAGE_DUMP_QUEUE_NAME) + ->setClass(Event::USAGE_DUMP_CLASS_NAME); + } + + /** + * Add Stats. + * + * @param array $stats + * @return self + */ + public function setStats(array $stats): self + { + $this->stats = $stats; + + return $this; + } + + /** + * Sends metrics to the usage worker. + * + * @return string|bool + */ + public function trigger(): string|bool + { + $client = new Client($this->queue, $this->connection); + + return $client->enqueue([ + 'stats' => $this->stats, + ]); + } +} diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 6573b3124..22e6dcd56 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -14,7 +14,7 @@ use Appwrite\Platform\Workers\Builds; use Appwrite\Platform\Workers\Deletes; use Appwrite\Platform\Workers\Hamster; use Appwrite\Platform\Workers\Usage; -use Appwrite\Platform\Workers\UsageHook; +use Appwrite\Platform\Workers\UsageDump; use Appwrite\Platform\Workers\Migrations; class Workers extends Service @@ -33,7 +33,7 @@ class Workers extends Service ->addAction(Builds::getName(), new Builds()) ->addAction(Deletes::getName(), new Deletes()) ->addAction(Hamster::getName(), new Hamster()) - ->addAction(UsageHook::getName(), new UsageHook()) + ->addAction(UsageDump::getName(), new UsageDump()) ->addAction(Usage::getName(), new Usage()) ->addAction(Migrations::getName(), new Migrations()) diff --git a/src/Appwrite/Platform/Workers/Usage.php b/src/Appwrite/Platform/Workers/Usage.php index 3809d000f..352742799 100644 --- a/src/Appwrite/Platform/Workers/Usage.php +++ b/src/Appwrite/Platform/Workers/Usage.php @@ -4,22 +4,22 @@ namespace Appwrite\Platform\Workers; use Exception; use Utopia\CLI\Console; -use Utopia\Database\Database; -use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Platform\Action; +use Appwrite\Event\UsageDump; use Utopia\Queue\Message; class Usage extends Action { - protected static array $stats = []; - protected array $periods = [ - '1h' => 'Y-m-d H:00', - '1d' => 'Y-m-d 00:00', - 'inf' => '0000-00-00 00:00' - ]; + private array $stats = []; + private int $lastTriggeredTime = 0; + private int $keys = 0; + private const INFINITY_PERIOD = '_inf_'; + private const KEYS_THRESHOLD = 5; + private const KEYS_SENT_THRESHOLD = 60; + + - protected const INFINITY_PERIOD = '_inf_'; public static function getName(): string { return 'usage'; @@ -35,26 +35,29 @@ class Usage extends Action ->desc('Usage worker') ->inject('message') ->inject('getProjectDB') - ->callback(function (Message $message, callable $getProjectDB) { - $this->action($message, $getProjectDB); + ->inject('queueForUsageDump') + ->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) { + $this->action($message, $getProjectDB, $queueForUsageDump); }); + + $this->lastTriggeredTime = time(); } /** * @param Message $message * @param callable $getProjectDB + * @param UsageDump $queueForUsageDump * @return void * @throws \Utopia\Database\Exception * @throws Exception */ - public function action(Message $message, callable $getProjectDB): void + public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } - $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); $projectId = $project->getInternalId(); foreach ($payload['reduce'] ?? [] as $document) { @@ -69,17 +72,47 @@ class Usage extends Action getProjectDB: $getProjectDB ); } - self::$stats[$projectId]['project'] = $project; + foreach ($payload['metrics'] ?? [] as $metric) { - if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) { - self::$stats[$projectId]['keys'][$metric['key']] = $metric['value']; + if ($metric['key'] === 'users') { + if ($metric['value'] < 0) { + $this->stats[$metric['key']]['negative'] += $metric['value']; + } else { + $this->stats[$metric['key']]['positive'] += $metric['value']; + } + } + } + + $this->stats[$projectId]['project'] = $project; + foreach ($payload['metrics'] ?? [] as $metric) { + $this->keys++; + if (!isset($this->stats[$projectId]['keys'][$metric['key']])) { + $this->stats[$projectId]['keys'][$metric['key']] = $metric['value']; continue; } - self::$stats[$projectId]['keys'][$metric['key']] += $metric['value']; + + $this->stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + + // if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats) + if ( + $this->keys >= self::KEYS_THRESHOLD || + (time() - $this->lastTriggeredTime > self::KEYS_SENT_THRESHOLD && $this->keys > 0) + ) { + $offset = count($this->stats); + $chunk = array_slice($this->stats, 0, $offset, true); + array_splice($this->stats, 0, $offset); + + $queueForUsageDump + ->setStats($chunk) + ->trigger(); + + //$this->stats = []; + $this->keys = 0; + $this->lastTriggeredTime = time(); } } - /** * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. * When we remove a parent document we need to deduct his children aggregation from the project scope. diff --git a/src/Appwrite/Platform/Workers/UsageDump.php b/src/Appwrite/Platform/Workers/UsageDump.php new file mode 100644 index 000000000..e65aec519 --- /dev/null +++ b/src/Appwrite/Platform/Workers/UsageDump.php @@ -0,0 +1,115 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + public static function getName(): string + { + return 'usage-dump'; + } + + /** + * @throws \Exception + */ + public function __construct() + { + + $this + ->inject('message') + ->inject('getProjectDB') + ->callback(function (Message $message, callable $getProjectDB) { + $this->action($message, $getProjectDB); + }) + ; + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @return void + * @throws Exception + * @throws \Utopia\Database\Exception + */ + public function action(Message $message, callable $getProjectDB): void + { + + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; + $projectInternalId = $project->getInternalId(); + + if ($numberOfKeys === 0) { + continue; + } + + try { + $dbForProject = $getProjectDB($project); + foreach ($stats['keys'] ?? [] as $key => $value) { + if ($value == 0) { + continue; + } + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + try { + $dbForProject->createDocument('stats_v2', new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => App::getEnv('_APP_REGION', 'default'), + ])); + } catch (Duplicate $th) { + if ($value < 0) { + var_dump([ + 'id' => $time . '_' . $period . '_' . $key, + 'value' => $value, + ]); + $dbForProject->decreaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + abs($value) + ); + } else { + $dbForProject->increaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + $value + ); + } + } + } + } + } catch (\Exception $e) { + console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage()); + } + } + } +} diff --git a/src/Appwrite/Platform/Workers/UsageHook.php b/src/Appwrite/Platform/Workers/UsageHook.php deleted file mode 100644 index 4781b1e89..000000000 --- a/src/Appwrite/Platform/Workers/UsageHook.php +++ /dev/null @@ -1,103 +0,0 @@ -setType(Action::TYPE_WORKER_START) - ->inject('register') - ->inject('getProjectDB') - ->callback(function ($register, callable $getProjectDB) { - $this->action($register, $getProjectDB); - }) - ; - } - - /** - * @param $register - * @param $getProjectDB - * @return void - */ - public function action($register, $getProjectDB): void - { - - $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000'); - Timer::tick($interval, function () use ($register, $getProjectDB) { - - $offset = count(self::$stats); - $projects = array_slice(self::$stats, 0, $offset, true); - array_splice(self::$stats, 0, $offset); - foreach ($projects as $data) { - $numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0; - $projectInternalId = $data['project']->getInternalId(); - $database = $data['project']['database'] ?? ''; - - console::warning('Ticker started ' . DateTime::now()); - - if ($numberOfKeys === 0) { - continue; - } - - try { - $dbForProject = $getProjectDB($data['project']); - foreach ($data['keys'] ?? [] as $key => $value) { - if ($value == 0) { - continue; - } - - foreach ($this->periods as $period => $format) { - $time = 'inf' === $period ? null : date($format, time()); - $id = \md5("{$time}_{$period}_{$key}"); - - try { - $dbForProject->createDocument('stats_v2', new Document([ - '$id' => $id, - 'period' => $period, - 'time' => $time, - 'metric' => $key, - 'value' => $value, - 'region' => App::getEnv('_APP_REGION', 'default'), - ])); - } catch (Duplicate $th) { - if ($value < 0) { - $dbForProject->decreaseDocumentAttribute( - 'stats_v2', - $id, - 'value', - abs($value) - ); - } else { - $dbForProject->increaseDocumentAttribute( - 'stats_v2', - $id, - 'value', - $value - ); - } - } - } - } - } catch (\Exception $e) { - console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage()); - } - } - }); - } -}