diff --git a/.env b/.env index 0247356a25..b915f91516 100644 --- a/.env +++ b/.env @@ -78,7 +78,7 @@ _APP_MAINTENANCE_RETENTION_CACHE=2592000 _APP_MAINTENANCE_RETENTION_EXECUTION=1209600 _APP_MAINTENANCE_RETENTION_ABUSE=86400 _APP_MAINTENANCE_RETENTION_AUDIT=1209600 -_APP_USAGE_AGGREGATION_INTERVAL=60000 +_APP_USAGE_AGGREGATION_INTERVAL=20 _APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000 _APP_MAINTENANCE_RETENTION_SCHEDULES=86400 _APP_USAGE_STATS=enabled diff --git a/Dockerfile b/Dockerfile index 9f3ee9b700..5d06d6c828 100755 --- a/Dockerfile +++ b/Dockerfile @@ -99,7 +99,8 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/worker-webhooks && \ chmod +x /usr/local/bin/worker-migrations && \ chmod +x /usr/local/bin/worker-hamster && \ - chmod +x /usr/local/bin/worker-usage + chmod +x /usr/local/bin/worker-usage && \ + chmod +x /usr/local/bin/worker-usage-dump # Cloud Executabless diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 68d2a544d8..17b2984471 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1791,7 +1791,7 @@ App::get('/v1/account/logs') } $response->dynamic(new Document([ - 'total' => $audit->countLogsByUser($user->getId()), + 'total' => $audit->countLogsByUser($user->getInternalId()), 'logs' => $output, ]), Response::MODEL_LOG_LIST); }); diff --git a/app/controllers/api/teams.php b/app/controllers/api/teams.php index a374c57cf1..2801e45bb5 100644 --- a/app/controllers/api/teams.php +++ b/app/controllers/api/teams.php @@ -967,7 +967,7 @@ App::patch('/v1/teams/:teamId/memberships/:membershipId/status') $dbForProject->deleteCachedDocument('users', $user->getId()); - $team = Authorization::skip(fn() => $dbForProject->updateDocument('teams', $team->getId(), $team->setAttribute('total', $team->getAttribute('total', 0) + 1))); + Authorization::skip(fn() => $dbForProject->increaseDocumentAttribute('teams', $team->getId(), 'total', 1)); $queueForEvents ->setParam('teamId', $team->getId()) @@ -1047,8 +1047,7 @@ App::delete('/v1/teams/:teamId/memberships/:membershipId') $dbForProject->deleteCachedDocument('users', $user->getId()); if ($membership->getAttribute('confirm')) { // Count only confirmed members - $team->setAttribute('total', \max($team->getAttribute('total', 0) - 1, 0)); - Authorization::skip(fn() => $dbForProject->updateDocument('teams', $team->getId(), $team)); + Authorization::skip(fn() => $dbForProject->decreaseDocumentAttribute('teams', $team->getId(), 'total', 1, 0)); } $queueForEvents diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 06b446a110..bbeef96f14 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -9,6 +9,7 @@ use Appwrite\Event\Event; use Appwrite\Network\Validator\Email; use Appwrite\Utopia\Database\Validator\CustomId; use Appwrite\Utopia\Database\Validator\Queries\Identities; +use Utopia\CLI\Console; use Utopia\Database\Validator\Queries; use Appwrite\Utopia\Database\Validator\Queries\Users; use Utopia\Database\Validator\Query\Limit; @@ -141,7 +142,6 @@ App::post('/v1/users') ->inject('hooks') ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks); - $response ->setStatusCode(Response::STATUS_CODE_CREATED) ->dynamic($user, Response::MODEL_USER); @@ -615,6 +615,9 @@ App::get('/v1/users/:userId/logs') $output[$i] = new Document([ 'event' => $log['event'], + 'userId' => ID::custom($log['data']['userId']), + 'userEmail' => $log['data']['userEmail'] ?? null, + 'userName' => $log['data']['userName'] ?? null, 'ip' => $log['ip'], 'time' => $log['time'], 'osCode' => $os['osCode'], @@ -643,7 +646,7 @@ App::get('/v1/users/:userId/logs') } $response->dynamic(new Document([ - 'total' => $audit->countLogsByUser($user->getId()), + 'total' => $audit->countLogsByUser($user->getInternalId()), 'logs' => $output, ]), Response::MODEL_LOG_LIST); }); diff --git a/app/worker.php b/app/worker.php index 1b1f4b9f93..4cf0edbae6 100644 --- a/app/worker.php +++ b/app/worker.php @@ -14,6 +14,7 @@ use Appwrite\Event\Mail; use Appwrite\Event\Migration; use Appwrite\Event\Phone; use Appwrite\Event\Usage; +use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\App; @@ -146,6 +147,9 @@ Server::setResource('log', fn() => new Log()); Server::setResource('queueForUsage', function (Connection $queue) { return new Usage($queue); }, ['queue']); +Server::setResource('queueForUsageDump', function (Connection $queue) { + return new UsageDump($queue); +}, ['queue']); Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); @@ -299,12 +303,9 @@ $worker Console::error('[Error] Line: ' . $error->getLine()); }); -try { - $workerStart = $worker->getWorkerStart(); -} catch (\Throwable $error) { - $worker->workerStart(); -} finally { - Console::info("Worker $workerName started"); -} +$worker->workerStart() + ->action(function () use ($workerName) { + Console::info("Worker $workerName started"); + }); $worker->start(); diff --git a/bin/worker-usage-dump b/bin/worker-usage-dump new file mode 100644 index 0000000000..43ca87fcb3 --- /dev/null +++ b/bin/worker-usage-dump @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php usage-dump $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e8433608d8..de71e3937a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -678,6 +678,38 @@ services: - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL + appwrite-worker-usage-dump: + entrypoint: worker-usage-dump + <<: *x-logging + container_name: appwrite-worker-usage-dump + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_PROVIDER + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + + appwrite-schedule: entrypoint: schedule <<: *x-logging diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index fc12c5b5b3..9f71ef5ebc 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -27,6 +27,9 @@ class Event public const USAGE_QUEUE_NAME = 'v1-usage'; public const USAGE_CLASS_NAME = 'UsageV1'; + public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump'; + public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1'; + public const WEBHOOK_QUEUE_NAME = 'v1-webhooks'; public const WEBHOOK_CLASS_NAME = 'WebhooksV1'; diff --git a/src/Appwrite/Event/Usage.php b/src/Appwrite/Event/Usage.php index 398c3319f2..ded276e166 100644 --- a/src/Appwrite/Event/Usage.php +++ b/src/Appwrite/Event/Usage.php @@ -2,6 +2,7 @@ namespace Appwrite\Event; +use Utopia\CLI\Console; use Utopia\Queue\Client; use Utopia\Queue\Connection; use Utopia\Database\Document; diff --git a/src/Appwrite/Event/UsageDump.php b/src/Appwrite/Event/UsageDump.php new file mode 100644 index 0000000000..8f87908849 --- /dev/null +++ b/src/Appwrite/Event/UsageDump.php @@ -0,0 +1,47 @@ +setQueue(Event::USAGE_DUMP_QUEUE_NAME) + ->setClass(Event::USAGE_DUMP_CLASS_NAME); + } + + /** + * Add Stats. + * + * @param array $stats + * @return self + */ + public function setStats(array $stats): self + { + $this->stats = $stats; + + return $this; + } + + /** + * Sends metrics to the usage worker. + * + * @return string|bool + */ + public function trigger(): string|bool + { + $client = new Client($this->queue, $this->connection); + + return $client->enqueue([ + 'stats' => $this->stats, + ]); + } +} diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 6573b3124c..22e6dcd56a 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -14,7 +14,7 @@ use Appwrite\Platform\Workers\Builds; use Appwrite\Platform\Workers\Deletes; use Appwrite\Platform\Workers\Hamster; use Appwrite\Platform\Workers\Usage; -use Appwrite\Platform\Workers\UsageHook; +use Appwrite\Platform\Workers\UsageDump; use Appwrite\Platform\Workers\Migrations; class Workers extends Service @@ -33,7 +33,7 @@ class Workers extends Service ->addAction(Builds::getName(), new Builds()) ->addAction(Deletes::getName(), new Deletes()) ->addAction(Hamster::getName(), new Hamster()) - ->addAction(UsageHook::getName(), new UsageHook()) + ->addAction(UsageDump::getName(), new UsageDump()) ->addAction(Usage::getName(), new Usage()) ->addAction(Migrations::getName(), new Migrations()) diff --git a/src/Appwrite/Platform/Workers/Usage.php b/src/Appwrite/Platform/Workers/Usage.php index 13c5a8bff0..c2e7e9d6c7 100644 --- a/src/Appwrite/Platform/Workers/Usage.php +++ b/src/Appwrite/Platform/Workers/Usage.php @@ -3,23 +3,23 @@ namespace Appwrite\Platform\Workers; use Exception; +use Utopia\App; use Utopia\CLI\Console; -use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Platform\Action; +use Appwrite\Event\UsageDump; use Utopia\Queue\Message; class Usage extends Action { - protected static array $stats = []; - protected array $periods = [ - '1h' => 'Y-m-d H:00', - '1d' => 'Y-m-d 00:00', - 'inf' => '0000-00-00 00:00' - ]; + private array $stats = []; + private int $lastTriggeredTime = 0; + private int $keys = 0; + private const INFINITY_PERIOD = '_inf_'; + private const KEYS_THRESHOLD = 10000; + - protected const INFINITY_PERIOD = '_inf_'; public static function getName(): string { return 'usage'; @@ -35,26 +35,31 @@ class Usage extends Action ->desc('Usage worker') ->inject('message') ->inject('getProjectDB') - ->callback(function (Message $message, callable $getProjectDB) { - $this->action($message, $getProjectDB); + ->inject('queueForUsageDump') + ->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) { + $this->action($message, $getProjectDB, $queueForUsageDump); }); + + $this->lastTriggeredTime = time(); } /** * @param Message $message * @param callable $getProjectDB + * @param UsageDump $queueForUsageDump * @return void * @throws \Utopia\Database\Exception * @throws Exception */ - public function action(Message $message, callable $getProjectDB): void + public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } + //Todo Figure out way to preserve keys when the container is being recreated @shimonewman - $payload = $message->getPayload() ?? []; + $aggregationInterval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20'); $project = new Document($payload['project'] ?? []); $projectId = $project->getInternalId(); foreach ($payload['reduce'] ?? [] as $document) { @@ -69,13 +74,32 @@ class Usage extends Action getProjectDB: $getProjectDB ); } - self::$stats[$projectId]['project'] = $project; + + $this->stats[$projectId]['project'] = $project; foreach ($payload['metrics'] ?? [] as $metric) { - if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) { - self::$stats[$projectId]['keys'][$metric['key']] = $metric['value']; + $this->keys++; + if (!isset($this->stats[$projectId]['keys'][$metric['key']])) { + $this->stats[$projectId]['keys'][$metric['key']] = $metric['value']; continue; } - self::$stats[$projectId]['keys'][$metric['key']] += $metric['value']; + + $this->stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + + // if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats) + if ( + $this->keys >= self::KEYS_THRESHOLD || + (time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0) + ) { + console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys'); + + $queueForUsageDump + ->setStats($this->stats) + ->trigger(); + + $this->stats = []; + $this->keys = 0; + $this->lastTriggeredTime = time(); } } diff --git a/src/Appwrite/Platform/Workers/UsageDump.php b/src/Appwrite/Platform/Workers/UsageDump.php new file mode 100644 index 0000000000..f563578984 --- /dev/null +++ b/src/Appwrite/Platform/Workers/UsageDump.php @@ -0,0 +1,113 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + public static function getName(): string + { + return 'usage-dump'; + } + + /** + * @throws \Exception + */ + public function __construct() + { + + $this + ->inject('message') + ->inject('getProjectDB') + ->callback(function (Message $message, callable $getProjectDB) { + $this->action($message, $getProjectDB); + }) + ; + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @return void + * @throws Exception + * @throws \Utopia\Database\Exception + */ + public function action(Message $message, callable $getProjectDB): void + { + + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + //Todo rename both usage workers @shimonewman + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; + + if ($numberOfKeys === 0) { + continue; + } + + console::log('[' . DateTime::now() . '] ProjectId [' . $project->getInternalId() . '] Database [' . $project['database'] . '] ' . $numberOfKeys . ' keys'); + + try { + $dbForProject = $getProjectDB($project); + foreach ($stats['keys'] ?? [] as $key => $value) { + if ($value == 0) { + continue; + } + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + try { + $dbForProject->createDocument('stats_v2', new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => App::getEnv('_APP_REGION', 'default'), + ])); + } catch (Duplicate $th) { + if ($value < 0) { + $dbForProject->decreaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + abs($value) + ); + } else { + $dbForProject->increaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + $value + ); + } + } + } + } + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + } + } + } +} diff --git a/src/Appwrite/Platform/Workers/UsageHook.php b/src/Appwrite/Platform/Workers/UsageHook.php deleted file mode 100644 index 8343b26355..0000000000 --- a/src/Appwrite/Platform/Workers/UsageHook.php +++ /dev/null @@ -1,103 +0,0 @@ -setType(Action::TYPE_WORKER_START) - ->inject('register') - ->inject('getProjectDB') - ->callback(function ($register, callable $getProjectDB) { - $this->action($register, $getProjectDB); - }) - ; - } - - /** - * @param $register - * @param $getProjectDB - * @return void - */ - public function action($register, $getProjectDB): void - { - - $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000'); - Timer::tick($interval, function () use ($register, $getProjectDB) { - - $offset = count(self::$stats); - $projects = array_slice(self::$stats, 0, $offset, true); - array_splice(self::$stats, 0, $offset); - foreach ($projects as $data) { - $numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0; - $projectInternalId = $data['project']->getInternalId(); - $database = $data['project']['database'] ?? ''; - - console::warning('Ticker started ' . DateTime::now()); - - if ($numberOfKeys === 0) { - continue; - } - - try { - $dbForProject = $getProjectDB($data['project']); - foreach ($data['keys'] ?? [] as $key => $value) { - if ($value == 0) { - continue; - } - - foreach ($this->periods as $period => $format) { - $time = 'inf' === $period ? null : date($format, time()); - $id = \md5("{$time}_{$period}_{$key}"); - - try { - $dbForProject->createDocument('stats', new Document([ - '$id' => $id, - 'period' => $period, - 'time' => $time, - 'metric' => $key, - 'value' => $value, - 'region' => App::getEnv('_APP_REGION', 'default'), - ])); - } catch (Duplicate $th) { - if ($value < 0) { - $dbForProject->decreaseDocumentAttribute( - 'stats', - $id, - 'value', - abs($value) - ); - } else { - $dbForProject->increaseDocumentAttribute( - 'stats', - $id, - 'value', - $value - ); - } - } - } - } - } catch (\Exception $e) { - console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage()); - } - } - }); - } -}