Merge branch 'main' of github.com:appwrite/appwrite into refactor-cache-poc
Conflicts: src/Appwrite/Platform/Workers/UsageHook.php
This commit is contained in:
commit
dac0e2a543
15 changed files with 260 additions and 136 deletions
2
.env
2
.env
|
@ -78,7 +78,7 @@ _APP_MAINTENANCE_RETENTION_CACHE=2592000
|
||||||
_APP_MAINTENANCE_RETENTION_EXECUTION=1209600
|
_APP_MAINTENANCE_RETENTION_EXECUTION=1209600
|
||||||
_APP_MAINTENANCE_RETENTION_ABUSE=86400
|
_APP_MAINTENANCE_RETENTION_ABUSE=86400
|
||||||
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
|
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
|
||||||
_APP_USAGE_AGGREGATION_INTERVAL=60000
|
_APP_USAGE_AGGREGATION_INTERVAL=20
|
||||||
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
|
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
|
||||||
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
|
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
|
||||||
_APP_USAGE_STATS=enabled
|
_APP_USAGE_STATS=enabled
|
||||||
|
|
|
@ -99,7 +99,8 @@ RUN chmod +x /usr/local/bin/doctor && \
|
||||||
chmod +x /usr/local/bin/worker-webhooks && \
|
chmod +x /usr/local/bin/worker-webhooks && \
|
||||||
chmod +x /usr/local/bin/worker-migrations && \
|
chmod +x /usr/local/bin/worker-migrations && \
|
||||||
chmod +x /usr/local/bin/worker-hamster && \
|
chmod +x /usr/local/bin/worker-hamster && \
|
||||||
chmod +x /usr/local/bin/worker-usage
|
chmod +x /usr/local/bin/worker-usage && \
|
||||||
|
chmod +x /usr/local/bin/worker-usage-dump
|
||||||
|
|
||||||
|
|
||||||
# Cloud Executabless
|
# Cloud Executabless
|
||||||
|
|
|
@ -1791,7 +1791,7 @@ App::get('/v1/account/logs')
|
||||||
}
|
}
|
||||||
|
|
||||||
$response->dynamic(new Document([
|
$response->dynamic(new Document([
|
||||||
'total' => $audit->countLogsByUser($user->getId()),
|
'total' => $audit->countLogsByUser($user->getInternalId()),
|
||||||
'logs' => $output,
|
'logs' => $output,
|
||||||
]), Response::MODEL_LOG_LIST);
|
]), Response::MODEL_LOG_LIST);
|
||||||
});
|
});
|
||||||
|
|
|
@ -967,7 +967,7 @@ App::patch('/v1/teams/:teamId/memberships/:membershipId/status')
|
||||||
|
|
||||||
$dbForProject->deleteCachedDocument('users', $user->getId());
|
$dbForProject->deleteCachedDocument('users', $user->getId());
|
||||||
|
|
||||||
$team = Authorization::skip(fn() => $dbForProject->updateDocument('teams', $team->getId(), $team->setAttribute('total', $team->getAttribute('total', 0) + 1)));
|
Authorization::skip(fn() => $dbForProject->increaseDocumentAttribute('teams', $team->getId(), 'total', 1));
|
||||||
|
|
||||||
$queueForEvents
|
$queueForEvents
|
||||||
->setParam('teamId', $team->getId())
|
->setParam('teamId', $team->getId())
|
||||||
|
@ -1047,8 +1047,7 @@ App::delete('/v1/teams/:teamId/memberships/:membershipId')
|
||||||
$dbForProject->deleteCachedDocument('users', $user->getId());
|
$dbForProject->deleteCachedDocument('users', $user->getId());
|
||||||
|
|
||||||
if ($membership->getAttribute('confirm')) { // Count only confirmed members
|
if ($membership->getAttribute('confirm')) { // Count only confirmed members
|
||||||
$team->setAttribute('total', \max($team->getAttribute('total', 0) - 1, 0));
|
Authorization::skip(fn() => $dbForProject->decreaseDocumentAttribute('teams', $team->getId(), 'total', 1, 0));
|
||||||
Authorization::skip(fn() => $dbForProject->updateDocument('teams', $team->getId(), $team));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$queueForEvents
|
$queueForEvents
|
||||||
|
|
|
@ -9,6 +9,7 @@ use Appwrite\Event\Event;
|
||||||
use Appwrite\Network\Validator\Email;
|
use Appwrite\Network\Validator\Email;
|
||||||
use Appwrite\Utopia\Database\Validator\CustomId;
|
use Appwrite\Utopia\Database\Validator\CustomId;
|
||||||
use Appwrite\Utopia\Database\Validator\Queries\Identities;
|
use Appwrite\Utopia\Database\Validator\Queries\Identities;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Database\Validator\Queries;
|
use Utopia\Database\Validator\Queries;
|
||||||
use Appwrite\Utopia\Database\Validator\Queries\Users;
|
use Appwrite\Utopia\Database\Validator\Queries\Users;
|
||||||
use Utopia\Database\Validator\Query\Limit;
|
use Utopia\Database\Validator\Query\Limit;
|
||||||
|
@ -141,7 +142,6 @@ App::post('/v1/users')
|
||||||
->inject('hooks')
|
->inject('hooks')
|
||||||
->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) {
|
->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) {
|
||||||
$user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks);
|
$user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks);
|
||||||
|
|
||||||
$response
|
$response
|
||||||
->setStatusCode(Response::STATUS_CODE_CREATED)
|
->setStatusCode(Response::STATUS_CODE_CREATED)
|
||||||
->dynamic($user, Response::MODEL_USER);
|
->dynamic($user, Response::MODEL_USER);
|
||||||
|
@ -615,6 +615,9 @@ App::get('/v1/users/:userId/logs')
|
||||||
|
|
||||||
$output[$i] = new Document([
|
$output[$i] = new Document([
|
||||||
'event' => $log['event'],
|
'event' => $log['event'],
|
||||||
|
'userId' => ID::custom($log['data']['userId']),
|
||||||
|
'userEmail' => $log['data']['userEmail'] ?? null,
|
||||||
|
'userName' => $log['data']['userName'] ?? null,
|
||||||
'ip' => $log['ip'],
|
'ip' => $log['ip'],
|
||||||
'time' => $log['time'],
|
'time' => $log['time'],
|
||||||
'osCode' => $os['osCode'],
|
'osCode' => $os['osCode'],
|
||||||
|
@ -643,7 +646,7 @@ App::get('/v1/users/:userId/logs')
|
||||||
}
|
}
|
||||||
|
|
||||||
$response->dynamic(new Document([
|
$response->dynamic(new Document([
|
||||||
'total' => $audit->countLogsByUser($user->getId()),
|
'total' => $audit->countLogsByUser($user->getInternalId()),
|
||||||
'logs' => $output,
|
'logs' => $output,
|
||||||
]), Response::MODEL_LOG_LIST);
|
]), Response::MODEL_LOG_LIST);
|
||||||
});
|
});
|
||||||
|
|
|
@ -14,6 +14,7 @@ use Appwrite\Event\Mail;
|
||||||
use Appwrite\Event\Migration;
|
use Appwrite\Event\Migration;
|
||||||
use Appwrite\Event\Phone;
|
use Appwrite\Event\Phone;
|
||||||
use Appwrite\Event\Usage;
|
use Appwrite\Event\Usage;
|
||||||
|
use Appwrite\Event\UsageDump;
|
||||||
use Appwrite\Platform\Appwrite;
|
use Appwrite\Platform\Appwrite;
|
||||||
use Swoole\Runtime;
|
use Swoole\Runtime;
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
|
@ -146,6 +147,9 @@ Server::setResource('log', fn() => new Log());
|
||||||
Server::setResource('queueForUsage', function (Connection $queue) {
|
Server::setResource('queueForUsage', function (Connection $queue) {
|
||||||
return new Usage($queue);
|
return new Usage($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
|
Server::setResource('queueForUsageDump', function (Connection $queue) {
|
||||||
|
return new UsageDump($queue);
|
||||||
|
}, ['queue']);
|
||||||
Server::setResource('queue', function (Group $pools) {
|
Server::setResource('queue', function (Group $pools) {
|
||||||
return $pools->get('queue')->pop()->getResource();
|
return $pools->get('queue')->pop()->getResource();
|
||||||
}, ['pools']);
|
}, ['pools']);
|
||||||
|
@ -299,12 +303,9 @@ $worker
|
||||||
Console::error('[Error] Line: ' . $error->getLine());
|
Console::error('[Error] Line: ' . $error->getLine());
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
$worker->workerStart()
|
||||||
$workerStart = $worker->getWorkerStart();
|
->action(function () use ($workerName) {
|
||||||
} catch (\Throwable $error) {
|
|
||||||
$worker->workerStart();
|
|
||||||
} finally {
|
|
||||||
Console::info("Worker $workerName started");
|
Console::info("Worker $workerName started");
|
||||||
}
|
});
|
||||||
|
|
||||||
$worker->start();
|
$worker->start();
|
||||||
|
|
3
bin/worker-usage-dump
Normal file
3
bin/worker-usage-dump
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
php /usr/src/code/app/worker.php usage-dump $@
|
|
@ -678,6 +678,38 @@ services:
|
||||||
- _APP_LOGGING_CONFIG
|
- _APP_LOGGING_CONFIG
|
||||||
- _APP_USAGE_AGGREGATION_INTERVAL
|
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||||
|
|
||||||
|
appwrite-worker-usage-dump:
|
||||||
|
entrypoint: worker-usage-dump
|
||||||
|
<<: *x-logging
|
||||||
|
container_name: appwrite-worker-usage-dump
|
||||||
|
image: appwrite-dev
|
||||||
|
networks:
|
||||||
|
- appwrite
|
||||||
|
volumes:
|
||||||
|
- ./app:/usr/src/code/app
|
||||||
|
- ./src:/usr/src/code/src
|
||||||
|
depends_on:
|
||||||
|
- redis
|
||||||
|
- mariadb
|
||||||
|
environment:
|
||||||
|
- _APP_ENV
|
||||||
|
- _APP_WORKER_PER_CORE
|
||||||
|
- _APP_OPENSSL_KEY_V1
|
||||||
|
- _APP_DB_HOST
|
||||||
|
- _APP_DB_PORT
|
||||||
|
- _APP_DB_SCHEMA
|
||||||
|
- _APP_DB_USER
|
||||||
|
- _APP_DB_PASS
|
||||||
|
- _APP_REDIS_HOST
|
||||||
|
- _APP_REDIS_PORT
|
||||||
|
- _APP_REDIS_USER
|
||||||
|
- _APP_REDIS_PASS
|
||||||
|
- _APP_USAGE_STATS
|
||||||
|
- _APP_LOGGING_PROVIDER
|
||||||
|
- _APP_LOGGING_CONFIG
|
||||||
|
- _APP_USAGE_AGGREGATION_INTERVAL
|
||||||
|
|
||||||
|
|
||||||
appwrite-schedule:
|
appwrite-schedule:
|
||||||
entrypoint: schedule
|
entrypoint: schedule
|
||||||
<<: *x-logging
|
<<: *x-logging
|
||||||
|
|
|
@ -27,6 +27,9 @@ class Event
|
||||||
public const USAGE_QUEUE_NAME = 'v1-usage';
|
public const USAGE_QUEUE_NAME = 'v1-usage';
|
||||||
public const USAGE_CLASS_NAME = 'UsageV1';
|
public const USAGE_CLASS_NAME = 'UsageV1';
|
||||||
|
|
||||||
|
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
|
||||||
|
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
|
||||||
|
|
||||||
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
|
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
|
||||||
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';
|
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
namespace Appwrite\Event;
|
namespace Appwrite\Event;
|
||||||
|
|
||||||
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Queue\Client;
|
use Utopia\Queue\Client;
|
||||||
use Utopia\Queue\Connection;
|
use Utopia\Queue\Connection;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
|
|
47
src/Appwrite/Event/UsageDump.php
Normal file
47
src/Appwrite/Event/UsageDump.php
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Event;
|
||||||
|
|
||||||
|
use Utopia\Queue\Client;
|
||||||
|
use Utopia\Queue\Connection;
|
||||||
|
|
||||||
|
class UsageDump extends Event
|
||||||
|
{
|
||||||
|
protected array $stats;
|
||||||
|
|
||||||
|
public function __construct(protected Connection $connection)
|
||||||
|
{
|
||||||
|
parent::__construct($connection);
|
||||||
|
|
||||||
|
$this
|
||||||
|
->setQueue(Event::USAGE_DUMP_QUEUE_NAME)
|
||||||
|
->setClass(Event::USAGE_DUMP_CLASS_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add Stats.
|
||||||
|
*
|
||||||
|
* @param array $stats
|
||||||
|
* @return self
|
||||||
|
*/
|
||||||
|
public function setStats(array $stats): self
|
||||||
|
{
|
||||||
|
$this->stats = $stats;
|
||||||
|
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends metrics to the usage worker.
|
||||||
|
*
|
||||||
|
* @return string|bool
|
||||||
|
*/
|
||||||
|
public function trigger(): string|bool
|
||||||
|
{
|
||||||
|
$client = new Client($this->queue, $this->connection);
|
||||||
|
|
||||||
|
return $client->enqueue([
|
||||||
|
'stats' => $this->stats,
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ use Appwrite\Platform\Workers\Builds;
|
||||||
use Appwrite\Platform\Workers\Deletes;
|
use Appwrite\Platform\Workers\Deletes;
|
||||||
use Appwrite\Platform\Workers\Hamster;
|
use Appwrite\Platform\Workers\Hamster;
|
||||||
use Appwrite\Platform\Workers\Usage;
|
use Appwrite\Platform\Workers\Usage;
|
||||||
use Appwrite\Platform\Workers\UsageHook;
|
use Appwrite\Platform\Workers\UsageDump;
|
||||||
use Appwrite\Platform\Workers\Migrations;
|
use Appwrite\Platform\Workers\Migrations;
|
||||||
|
|
||||||
class Workers extends Service
|
class Workers extends Service
|
||||||
|
@ -33,7 +33,7 @@ class Workers extends Service
|
||||||
->addAction(Builds::getName(), new Builds())
|
->addAction(Builds::getName(), new Builds())
|
||||||
->addAction(Deletes::getName(), new Deletes())
|
->addAction(Deletes::getName(), new Deletes())
|
||||||
->addAction(Hamster::getName(), new Hamster())
|
->addAction(Hamster::getName(), new Hamster())
|
||||||
->addAction(UsageHook::getName(), new UsageHook())
|
->addAction(UsageDump::getName(), new UsageDump())
|
||||||
->addAction(Usage::getName(), new Usage())
|
->addAction(Usage::getName(), new Usage())
|
||||||
->addAction(Migrations::getName(), new Migrations())
|
->addAction(Migrations::getName(), new Migrations())
|
||||||
|
|
||||||
|
|
|
@ -3,23 +3,23 @@
|
||||||
namespace Appwrite\Platform\Workers;
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
use Exception;
|
use Exception;
|
||||||
|
use Utopia\App;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Database\Database;
|
|
||||||
use Utopia\Database\DateTime;
|
use Utopia\Database\DateTime;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
use Utopia\Platform\Action;
|
use Utopia\Platform\Action;
|
||||||
|
use Appwrite\Event\UsageDump;
|
||||||
use Utopia\Queue\Message;
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
class Usage extends Action
|
class Usage extends Action
|
||||||
{
|
{
|
||||||
protected static array $stats = [];
|
private array $stats = [];
|
||||||
protected array $periods = [
|
private int $lastTriggeredTime = 0;
|
||||||
'1h' => 'Y-m-d H:00',
|
private int $keys = 0;
|
||||||
'1d' => 'Y-m-d 00:00',
|
private const INFINITY_PERIOD = '_inf_';
|
||||||
'inf' => '0000-00-00 00:00'
|
private const KEYS_THRESHOLD = 10000;
|
||||||
];
|
|
||||||
|
|
||||||
protected const INFINITY_PERIOD = '_inf_';
|
|
||||||
public static function getName(): string
|
public static function getName(): string
|
||||||
{
|
{
|
||||||
return 'usage';
|
return 'usage';
|
||||||
|
@ -35,26 +35,31 @@ class Usage extends Action
|
||||||
->desc('Usage worker')
|
->desc('Usage worker')
|
||||||
->inject('message')
|
->inject('message')
|
||||||
->inject('getProjectDB')
|
->inject('getProjectDB')
|
||||||
->callback(function (Message $message, callable $getProjectDB) {
|
->inject('queueForUsageDump')
|
||||||
$this->action($message, $getProjectDB);
|
->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) {
|
||||||
|
$this->action($message, $getProjectDB, $queueForUsageDump);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$this->lastTriggeredTime = time();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param Message $message
|
* @param Message $message
|
||||||
* @param callable $getProjectDB
|
* @param callable $getProjectDB
|
||||||
|
* @param UsageDump $queueForUsageDump
|
||||||
* @return void
|
* @return void
|
||||||
* @throws \Utopia\Database\Exception
|
* @throws \Utopia\Database\Exception
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public function action(Message $message, callable $getProjectDB): void
|
public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void
|
||||||
{
|
{
|
||||||
$payload = $message->getPayload() ?? [];
|
$payload = $message->getPayload() ?? [];
|
||||||
if (empty($payload)) {
|
if (empty($payload)) {
|
||||||
throw new Exception('Missing payload');
|
throw new Exception('Missing payload');
|
||||||
}
|
}
|
||||||
|
//Todo Figure out way to preserve keys when the container is being recreated @shimonewman
|
||||||
|
|
||||||
$payload = $message->getPayload() ?? [];
|
$aggregationInterval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
|
||||||
$project = new Document($payload['project'] ?? []);
|
$project = new Document($payload['project'] ?? []);
|
||||||
$projectId = $project->getInternalId();
|
$projectId = $project->getInternalId();
|
||||||
foreach ($payload['reduce'] ?? [] as $document) {
|
foreach ($payload['reduce'] ?? [] as $document) {
|
||||||
|
@ -69,13 +74,32 @@ class Usage extends Action
|
||||||
getProjectDB: $getProjectDB
|
getProjectDB: $getProjectDB
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
self::$stats[$projectId]['project'] = $project;
|
|
||||||
|
$this->stats[$projectId]['project'] = $project;
|
||||||
foreach ($payload['metrics'] ?? [] as $metric) {
|
foreach ($payload['metrics'] ?? [] as $metric) {
|
||||||
if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) {
|
$this->keys++;
|
||||||
self::$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
|
||||||
|
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
self::$stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
|
||||||
|
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
||||||
|
}
|
||||||
|
|
||||||
|
// if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
|
||||||
|
if (
|
||||||
|
$this->keys >= self::KEYS_THRESHOLD ||
|
||||||
|
(time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0)
|
||||||
|
) {
|
||||||
|
console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
|
||||||
|
|
||||||
|
$queueForUsageDump
|
||||||
|
->setStats($this->stats)
|
||||||
|
->trigger();
|
||||||
|
|
||||||
|
$this->stats = [];
|
||||||
|
$this->keys = 0;
|
||||||
|
$this->lastTriggeredTime = time();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
113
src/Appwrite/Platform/Workers/UsageDump.php
Normal file
113
src/Appwrite/Platform/Workers/UsageDump.php
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Appwrite\Platform\Workers;
|
||||||
|
|
||||||
|
use Appwrite\Extend\Exception;
|
||||||
|
use Utopia\App;
|
||||||
|
use Utopia\Database\Document;
|
||||||
|
use Utopia\Database\Exception\Duplicate;
|
||||||
|
use Utopia\Platform\Action;
|
||||||
|
use Utopia\CLI\Console;
|
||||||
|
use Utopia\Database\DateTime;
|
||||||
|
use Utopia\Queue\Message;
|
||||||
|
|
||||||
|
class UsageDump extends Action
|
||||||
|
{
|
||||||
|
protected array $stats = [];
|
||||||
|
protected array $periods = [
|
||||||
|
'1h' => 'Y-m-d H:00',
|
||||||
|
'1d' => 'Y-m-d 00:00',
|
||||||
|
'inf' => '0000-00-00 00:00'
|
||||||
|
];
|
||||||
|
|
||||||
|
public static function getName(): string
|
||||||
|
{
|
||||||
|
return 'usage-dump';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws \Exception
|
||||||
|
*/
|
||||||
|
public function __construct()
|
||||||
|
{
|
||||||
|
|
||||||
|
$this
|
||||||
|
->inject('message')
|
||||||
|
->inject('getProjectDB')
|
||||||
|
->callback(function (Message $message, callable $getProjectDB) {
|
||||||
|
$this->action($message, $getProjectDB);
|
||||||
|
})
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Message $message
|
||||||
|
* @param callable $getProjectDB
|
||||||
|
* @return void
|
||||||
|
* @throws Exception
|
||||||
|
* @throws \Utopia\Database\Exception
|
||||||
|
*/
|
||||||
|
public function action(Message $message, callable $getProjectDB): void
|
||||||
|
{
|
||||||
|
|
||||||
|
$payload = $message->getPayload() ?? [];
|
||||||
|
if (empty($payload)) {
|
||||||
|
throw new Exception('Missing payload');
|
||||||
|
}
|
||||||
|
|
||||||
|
//Todo rename both usage workers @shimonewman
|
||||||
|
foreach ($payload['stats'] ?? [] as $stats) {
|
||||||
|
$project = new Document($stats['project'] ?? []);
|
||||||
|
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
|
||||||
|
|
||||||
|
if ($numberOfKeys === 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
console::log('[' . DateTime::now() . '] ProjectId [' . $project->getInternalId() . '] Database [' . $project['database'] . '] ' . $numberOfKeys . ' keys');
|
||||||
|
|
||||||
|
try {
|
||||||
|
$dbForProject = $getProjectDB($project);
|
||||||
|
foreach ($stats['keys'] ?? [] as $key => $value) {
|
||||||
|
if ($value == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach ($this->periods as $period => $format) {
|
||||||
|
$time = 'inf' === $period ? null : date($format, time());
|
||||||
|
$id = \md5("{$time}_{$period}_{$key}");
|
||||||
|
|
||||||
|
try {
|
||||||
|
$dbForProject->createDocument('stats_v2', new Document([
|
||||||
|
'$id' => $id,
|
||||||
|
'period' => $period,
|
||||||
|
'time' => $time,
|
||||||
|
'metric' => $key,
|
||||||
|
'value' => $value,
|
||||||
|
'region' => App::getEnv('_APP_REGION', 'default'),
|
||||||
|
]));
|
||||||
|
} catch (Duplicate $th) {
|
||||||
|
if ($value < 0) {
|
||||||
|
$dbForProject->decreaseDocumentAttribute(
|
||||||
|
'stats_v2',
|
||||||
|
$id,
|
||||||
|
'value',
|
||||||
|
abs($value)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
$dbForProject->increaseDocumentAttribute(
|
||||||
|
'stats_v2',
|
||||||
|
$id,
|
||||||
|
'value',
|
||||||
|
$value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (\Exception $e) {
|
||||||
|
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,103 +0,0 @@
|
||||||
<?php
|
|
||||||
|
|
||||||
namespace Appwrite\Platform\Workers;
|
|
||||||
|
|
||||||
use Utopia\App;
|
|
||||||
use Utopia\Database\Document;
|
|
||||||
use Utopia\Database\Exception\Duplicate;
|
|
||||||
use Utopia\Platform\Action;
|
|
||||||
use Utopia\CLI\Console;
|
|
||||||
use Swoole\Timer;
|
|
||||||
use Utopia\Database\DateTime;
|
|
||||||
|
|
||||||
class UsageHook extends Usage
|
|
||||||
{
|
|
||||||
public static function getName(): string
|
|
||||||
{
|
|
||||||
return 'usageHook';
|
|
||||||
}
|
|
||||||
|
|
||||||
public function __construct()
|
|
||||||
{
|
|
||||||
|
|
||||||
$this
|
|
||||||
->setType(Action::TYPE_WORKER_START)
|
|
||||||
->inject('register')
|
|
||||||
->inject('getProjectDB')
|
|
||||||
->callback(function ($register, callable $getProjectDB) {
|
|
||||||
$this->action($register, $getProjectDB);
|
|
||||||
})
|
|
||||||
;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param $register
|
|
||||||
* @param $getProjectDB
|
|
||||||
* @return void
|
|
||||||
*/
|
|
||||||
public function action($register, $getProjectDB): void
|
|
||||||
{
|
|
||||||
|
|
||||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000');
|
|
||||||
Timer::tick($interval, function () use ($register, $getProjectDB) {
|
|
||||||
|
|
||||||
$offset = count(self::$stats);
|
|
||||||
$projects = array_slice(self::$stats, 0, $offset, true);
|
|
||||||
array_splice(self::$stats, 0, $offset);
|
|
||||||
foreach ($projects as $data) {
|
|
||||||
$numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0;
|
|
||||||
$projectInternalId = $data['project']->getInternalId();
|
|
||||||
$database = $data['project']['database'] ?? '';
|
|
||||||
|
|
||||||
console::warning('Ticker started ' . DateTime::now());
|
|
||||||
|
|
||||||
if ($numberOfKeys === 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
$dbForProject = $getProjectDB($data['project']);
|
|
||||||
foreach ($data['keys'] ?? [] as $key => $value) {
|
|
||||||
if ($value == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach ($this->periods as $period => $format) {
|
|
||||||
$time = 'inf' === $period ? null : date($format, time());
|
|
||||||
$id = \md5("{$time}_{$period}_{$key}");
|
|
||||||
|
|
||||||
try {
|
|
||||||
$dbForProject->createDocument('stats', new Document([
|
|
||||||
'$id' => $id,
|
|
||||||
'period' => $period,
|
|
||||||
'time' => $time,
|
|
||||||
'metric' => $key,
|
|
||||||
'value' => $value,
|
|
||||||
'region' => App::getEnv('_APP_REGION', 'default'),
|
|
||||||
]));
|
|
||||||
} catch (Duplicate $th) {
|
|
||||||
if ($value < 0) {
|
|
||||||
$dbForProject->decreaseDocumentAttribute(
|
|
||||||
'stats',
|
|
||||||
$id,
|
|
||||||
'value',
|
|
||||||
abs($value)
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
$dbForProject->increaseDocumentAttribute(
|
|
||||||
'stats',
|
|
||||||
$id,
|
|
||||||
'value',
|
|
||||||
$value
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue