1
0
Fork 0
mirror of synced 2024-05-17 11:12:41 +12:00

refactor usage poc

This commit is contained in:
shimon 2024-01-28 11:28:59 +02:00
parent b024cb5614
commit 7a2ee683e3
12 changed files with 278 additions and 131 deletions

View file

@ -95,7 +95,8 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/worker-webhooks && \
chmod +x /usr/local/bin/worker-migrations && \
chmod +x /usr/local/bin/worker-hamster && \
chmod +x /usr/local/bin/worker-usage
chmod +x /usr/local/bin/worker-usage && \
chmod +x /usr/local/bin/worker-usage-dump
# Cloud Executabless

View file

@ -9,6 +9,7 @@ use Appwrite\Event\Event;
use Appwrite\Network\Validator\Email;
use Appwrite\Utopia\Database\Validator\CustomId;
use Appwrite\Utopia\Database\Validator\Queries\Identities;
use Utopia\CLI\Console;
use Utopia\Database\Validator\Queries;
use Appwrite\Utopia\Database\Validator\Queries\Users;
use Utopia\Database\Validator\Query\Limit;
@ -141,6 +142,8 @@ App::post('/v1/users')
->inject('hooks')
->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) {
$user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks);
//Todo debug (to be removed laster @shimon)
//Console::log('@create user=' . time() . '=' . $user->getId());
$response
->setStatusCode(Response::STATUS_CODE_CREATED)
@ -1191,6 +1194,9 @@ App::delete('/v1/users/:userId')
$dbForProject->deleteDocument('users', $userId);
//Todo debug (to be removed laster @shimon)
//Console::log('@delete user=' . $userId . '=' . time() . '=' . $user->getId());
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($clone);

View file

@ -14,6 +14,7 @@ use Appwrite\Event\Mail;
use Appwrite\Event\Migration;
use Appwrite\Event\Phone;
use Appwrite\Event\Usage;
use Appwrite\Event\UsageDump;
use Appwrite\Platform\Appwrite;
use Swoole\Runtime;
use Utopia\App;
@ -146,6 +147,9 @@ Server::setResource('log', fn() => new Log());
Server::setResource('queueForUsage', function (Connection $queue) {
return new Usage($queue);
}, ['queue']);
Server::setResource('queueForUsageDump', function (Connection $queue) {
return new UsageDump($queue);
}, ['queue']);
Server::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
@ -299,12 +303,9 @@ $worker
Console::error('[Error] Line: ' . $error->getLine());
});
try {
$workerStart = $worker->getWorkerStart();
} catch (\Throwable $error) {
$worker->workerStart();
} finally {
Console::info("Worker $workerName started");
}
$worker->workerStart()
->action(function () use ($workerName) {
Console::info("Worker $workerName started");
});
$worker->start();

3
bin/worker-usage-dump Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/worker.php usage-dump $@

View file

@ -676,6 +676,38 @@ services:
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-worker-usage-dump:
entrypoint: worker-usage-dump
<<: *x-logging
container_name: appwrite-worker-usage-dump
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-schedule:
entrypoint: schedule
<<: *x-logging

View file

@ -27,6 +27,9 @@ class Event
public const USAGE_QUEUE_NAME = 'v1-usage';
public const USAGE_CLASS_NAME = 'UsageV1';
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';

View file

@ -2,6 +2,7 @@
namespace Appwrite\Event;
use Utopia\CLI\Console;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\Database\Document;
@ -42,6 +43,14 @@ class Usage extends Event
*/
public function addMetric(string $key, int $value): self
{
//Todo debug (to be removed laster @shimon)
// if ($key === 'users') {
// if ($value < 0) {
// console::log('@negative=' . $value);
// } else {
// console::log('@positive=' . $value);
// }
// }
$this->metrics[] = [
'key' => $key,
'value' => $value,

View file

@ -0,0 +1,47 @@
<?php
namespace Appwrite\Event;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class UsageDump extends Event
{
protected array $stats;
public function __construct(protected Connection $connection)
{
parent::__construct($connection);
$this
->setQueue(Event::USAGE_DUMP_QUEUE_NAME)
->setClass(Event::USAGE_DUMP_CLASS_NAME);
}
/**
* Add Stats.
*
* @param array $stats
* @return self
*/
public function setStats(array $stats): self
{
$this->stats = $stats;
return $this;
}
/**
* Sends metrics to the usage worker.
*
* @return string|bool
*/
public function trigger(): string|bool
{
$client = new Client($this->queue, $this->connection);
return $client->enqueue([
'stats' => $this->stats,
]);
}
}

View file

@ -14,7 +14,7 @@ use Appwrite\Platform\Workers\Builds;
use Appwrite\Platform\Workers\Deletes;
use Appwrite\Platform\Workers\Hamster;
use Appwrite\Platform\Workers\Usage;
use Appwrite\Platform\Workers\UsageHook;
use Appwrite\Platform\Workers\UsageDump;
use Appwrite\Platform\Workers\Migrations;
class Workers extends Service
@ -33,7 +33,7 @@ class Workers extends Service
->addAction(Builds::getName(), new Builds())
->addAction(Deletes::getName(), new Deletes())
->addAction(Hamster::getName(), new Hamster())
->addAction(UsageHook::getName(), new UsageHook())
->addAction(UsageDump::getName(), new UsageDump())
->addAction(Usage::getName(), new Usage())
->addAction(Migrations::getName(), new Migrations())

View file

@ -4,22 +4,22 @@ namespace Appwrite\Platform\Workers;
use Exception;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Appwrite\Event\UsageDump;
use Utopia\Queue\Message;
class Usage extends Action
{
protected static array $stats = [];
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
private array $stats = [];
private int $lastTriggeredTime = 0;
private int $keys = 0;
private const INFINITY_PERIOD = '_inf_';
private const KEYS_THRESHOLD = 5;
private const KEYS_SENT_THRESHOLD = 60;
protected const INFINITY_PERIOD = '_inf_';
public static function getName(): string
{
return 'usage';
@ -35,26 +35,29 @@ class Usage extends Action
->desc('Usage worker')
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
->inject('queueForUsageDump')
->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) {
$this->action($message, $getProjectDB, $queueForUsageDump);
});
$this->lastTriggeredTime = time();
}
/**
* @param Message $message
* @param callable $getProjectDB
* @param UsageDump $queueForUsageDump
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, callable $getProjectDB): void
public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
$projectId = $project->getInternalId();
foreach ($payload['reduce'] ?? [] as $document) {
@ -69,17 +72,47 @@ class Usage extends Action
getProjectDB: $getProjectDB
);
}
self::$stats[$projectId]['project'] = $project;
foreach ($payload['metrics'] ?? [] as $metric) {
if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) {
self::$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
if ($metric['key'] === 'users') {
if ($metric['value'] < 0) {
$this->stats[$metric['key']]['negative'] += $metric['value'];
} else {
$this->stats[$metric['key']]['positive'] += $metric['value'];
}
}
}
$this->stats[$projectId]['project'] = $project;
foreach ($payload['metrics'] ?? [] as $metric) {
$this->keys++;
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
continue;
}
self::$stats[$projectId]['keys'][$metric['key']] += $metric['value'];
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
}
// if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
if (
$this->keys >= self::KEYS_THRESHOLD ||
(time() - $this->lastTriggeredTime > self::KEYS_SENT_THRESHOLD && $this->keys > 0)
) {
$offset = count($this->stats);
$chunk = array_slice($this->stats, 0, $offset, true);
array_splice($this->stats, 0, $offset);
$queueForUsageDump
->setStats($chunk)
->trigger();
//$this->stats = [];
$this->keys = 0;
$this->lastTriggeredTime = time();
}
}
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope.

View file

@ -0,0 +1,115 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Extend\Exception;
use Utopia\App;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Queue\Message;
class UsageDump extends Action
{
protected array $stats = [];
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
public static function getName(): string
{
return 'usage-dump';
}
/**
* @throws \Exception
*/
public function __construct()
{
$this
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
})
;
}
/**
* @param Message $message
* @param callable $getProjectDB
* @return void
* @throws Exception
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, callable $getProjectDB): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
$projectInternalId = $project->getInternalId();
if ($numberOfKeys === 0) {
continue;
}
try {
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats_v2', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
var_dump([
'id' => $time . '_' . $period . '_' . $key,
'value' => $value,
]);
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage());
}
}
}
}

View file

@ -1,103 +0,0 @@
<?php
namespace Appwrite\Platform\Workers;
use Utopia\App;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Swoole\Timer;
use Utopia\Database\DateTime;
class UsageHook extends Usage
{
public static function getName(): string
{
return 'usageHook';
}
public function __construct()
{
$this
->setType(Action::TYPE_WORKER_START)
->inject('register')
->inject('getProjectDB')
->callback(function ($register, callable $getProjectDB) {
$this->action($register, $getProjectDB);
})
;
}
/**
* @param $register
* @param $getProjectDB
* @return void
*/
public function action($register, $getProjectDB): void
{
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000');
Timer::tick($interval, function () use ($register, $getProjectDB) {
$offset = count(self::$stats);
$projects = array_slice(self::$stats, 0, $offset, true);
array_splice(self::$stats, 0, $offset);
foreach ($projects as $data) {
$numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0;
$projectInternalId = $data['project']->getInternalId();
$database = $data['project']['database'] ?? '';
console::warning('Ticker started ' . DateTime::now());
if ($numberOfKeys === 0) {
continue;
}
try {
$dbForProject = $getProjectDB($data['project']);
foreach ($data['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats_v2', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage());
}
}
});
}
}