project stats and database stats
This commit is contained in:
parent
4bab152b3d
commit
8a207f02a0
|
@ -279,7 +279,7 @@ App::get('/v1/projects/:projectId/usage')
|
|||
$database = $client->selectDB('telegraf');
|
||||
|
||||
// Requests
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_requests_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "project"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_requests_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "projectId"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$points = $result->getPoints();
|
||||
|
||||
foreach ($points as $point) {
|
||||
|
@ -290,7 +290,7 @@ App::get('/v1/projects/:projectId/usage')
|
|||
}
|
||||
|
||||
// Network
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_network_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "project"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_network_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "projectId"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$points = $result->getPoints();
|
||||
|
||||
foreach ($points as $point) {
|
||||
|
@ -301,7 +301,7 @@ App::get('/v1/projects/:projectId/usage')
|
|||
}
|
||||
|
||||
// Functions
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_executions_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "project"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_executions_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "projectId"=\'' . $project->getId() . '\' GROUP BY time(' . $period[$range]['group'] . ') FILL(null)');
|
||||
$points = $result->getPoints();
|
||||
|
||||
foreach ($points as $point) {
|
||||
|
|
|
@ -14,6 +14,34 @@ use Utopia\Database\Document;
|
|||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
|
||||
/**
|
||||
* 1. Load all the projects
|
||||
* 2. Load latest data entered entered for each project, for each period
|
||||
* 3. Start the loop
|
||||
* 4. Fore each project, for each metric, for each period - sync data
|
||||
*/
|
||||
|
||||
/**
|
||||
* Only succefull operations
|
||||
*
|
||||
* database.collections.CRUD (project=x)
|
||||
* database.documents.CRUD (project=x,collection=y)
|
||||
*
|
||||
* users.CRUD
|
||||
* users.sessions.create (project=x,provider=y)
|
||||
* users.sessions.delete (project=x,provider=y)
|
||||
*
|
||||
* storage.buckets.CRUD (project=x)
|
||||
* storage.files.CRUD (project=x,files=y)
|
||||
*
|
||||
* refactor later
|
||||
* - functions
|
||||
* - realtime
|
||||
* - teams
|
||||
* - webhooks
|
||||
* - keys - really later!
|
||||
*/
|
||||
|
||||
$cli
|
||||
->task('usage')
|
||||
->desc('Schedules syncing data from influxdb to Appwrite console db')
|
||||
|
@ -22,10 +50,90 @@ $cli
|
|||
Console::success(APP_NAME . ' usage sync process v1 has started');
|
||||
|
||||
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '30'); //30 seconds
|
||||
$periods = [
|
||||
[
|
||||
'key' => '30m',
|
||||
'startTime' => '-24 hours',
|
||||
],
|
||||
[
|
||||
'key' => '1d',
|
||||
'startTime' => '-90 days',
|
||||
],
|
||||
];
|
||||
|
||||
//use projectId from influxdb instead of iterating over projects from DB
|
||||
|
||||
$globalMetrics = [
|
||||
'requests' => [
|
||||
'method' => 'getGlobalMetrics',
|
||||
'table' => 'appwrite_usage_requests_all',
|
||||
],
|
||||
'network' => [
|
||||
'method' => 'getGlobalMetrics',
|
||||
'table' => 'appwrite_usage_network_all',
|
||||
],
|
||||
'executions' => [
|
||||
'method' => 'getGlobalMetrics',
|
||||
'table' => 'appwrite_usage_executions_all',
|
||||
],
|
||||
'database.collections.create' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_collections_create',
|
||||
],
|
||||
'database.collections.read' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_collections_read',
|
||||
],
|
||||
'database.collections.update' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_collections_update',
|
||||
],
|
||||
'database.collections.delete' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_collections_delete',
|
||||
],
|
||||
'database.documents.create' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
],
|
||||
'database.documents.read' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
],
|
||||
'database.documents.update' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
],
|
||||
'database.documents.delete' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
],
|
||||
'database.documents.collectionId.create' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_create',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.documents.collectionId.read' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_read',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.documents.collectionId.update' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_update',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
'database.documents.collectionId.delete' => [
|
||||
'method' => 'getDatabaseMetrics',
|
||||
'table' => 'appwrite_usage_database_documents_delete',
|
||||
'groupBy' => 'collectionId',
|
||||
],
|
||||
];
|
||||
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
do {
|
||||
do { // connect to db
|
||||
try {
|
||||
$attempts++;
|
||||
$db = $register->get('db');
|
||||
|
@ -46,199 +154,66 @@ $cli
|
|||
$dbForProject = new Database(new MariaDB($db), $cacheAdapter);
|
||||
|
||||
Authorization::disable();
|
||||
$projectIds = [];
|
||||
$latestProject = null;
|
||||
$latestData = [];
|
||||
do {
|
||||
$projects = $dbForConsole->find('projects', [], 100, orderAfter:$latestProject);
|
||||
if (!empty($projects)) {
|
||||
$latestProject = $projects[array_key_last($projects)];
|
||||
$latestData = getLatestData($projects, $latestData, $dbForProject, $projectIds);
|
||||
}
|
||||
} while (!empty($projects));
|
||||
|
||||
$projects = null;
|
||||
|
||||
$firstRun = true;
|
||||
Console::loop(function () use ($interval, $register, &$projectIds, &$latestData, $dbForProject, $dbForConsole, &$firstRun, &$latestProject) {
|
||||
Console::loop(function () use ($interval, $register, &$latestData, $dbForProject, $dbForConsole, &$firstRun, $globalMetrics, $periods) {
|
||||
$time = date('d-m-Y H:i:s', time());
|
||||
Console::info("[{$time}] Syncing usage data from influxdb to Appwrite Console DB every {$interval} seconds");
|
||||
|
||||
if (!$firstRun) {
|
||||
$projects = $dbForConsole->find('projects', limit:100, orderAfter:$latestProject);
|
||||
if (!empty($projects)) {
|
||||
$latestProject = $projects[array_key_last($projects)];
|
||||
$latestData = getLatestData($projects, $latestData, $dbForProject, $projectIds);
|
||||
}
|
||||
}
|
||||
Console::info("[{$time}] Aggregating usage data every {$interval} seconds");
|
||||
|
||||
$client = $register->get('influxdb');
|
||||
if ($client) {
|
||||
foreach ($projectIds as $id => $value) {
|
||||
syncData($client, $id, $latestData, $dbForProject);
|
||||
$database = $client->selectDB('telegraf');
|
||||
// sync data
|
||||
foreach ($globalMetrics as $metric => $options) {
|
||||
foreach ($periods as $period) {
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
|
||||
$table = $options['table'];
|
||||
$groupBy = $options['groupBy'] ?? '';
|
||||
|
||||
$query = 'SELECT sum(value) AS "value" FROM "' . $table . '" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' GROUP BY time(' . $period['key'] . '), "projectId"' . (empty($groupBy) ? '' : ', "' . $groupBy . '"') . ' FILL(null)';
|
||||
$result = $database->query($query);
|
||||
$points = $result->getPoints();
|
||||
foreach ($points as $point) {
|
||||
$projectId = $point['projectId'];
|
||||
if (!empty($projectId) && $projectId != 'console') {
|
||||
$dbForProject->setNamespace('project_' . $projectId . '_internal');
|
||||
if (!empty($groupBy)) {
|
||||
$groupedBy = $point[$groupBy];
|
||||
if (empty($groupedBy)) {
|
||||
continue;
|
||||
}
|
||||
$metric = str_replace($groupBy, $groupedBy, $metric);
|
||||
}
|
||||
$time = \strtotime($point['time']);
|
||||
$id = \md5($time . '_' . $period['key'] . '_' . $metric);
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
try {
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period['key'],
|
||||
'time' => $time,
|
||||
'metric' => $metric,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument('stats', $document->getId(),
|
||||
$document->setAttribute('value', $value));
|
||||
}
|
||||
$latestData[$projectId][$metric][$period['key']] = $time;
|
||||
} catch (\Exception$e) {
|
||||
Console::warning("Failed to save data for project {$projectId} and metric {$metric}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
$firstRun = false;
|
||||
}, $interval);
|
||||
});
|
||||
|
||||
function getLatestData(&$projects, &$latestData, $dbForProject, &$projectIds)
|
||||
{
|
||||
foreach ($projects as $project) {
|
||||
$id = $project->getId();
|
||||
$projectIds[$id] = true;
|
||||
$dbForProject->setNamespace("project_{$id}_internal");
|
||||
foreach (['requests', 'network', 'executions', 'database.reads'] as $metric) {
|
||||
$doc = $dbForProject->findOne('stats', [new Query("period", Query::TYPE_EQUAL, ["1d"]), new Query("metric", Query::TYPE_EQUAL, [$metric])], 0, ['time'], [Database::ORDER_DESC]);
|
||||
$latestData[$id][$metric]["1d"] = $doc ? $doc->getAttribute('time') : null;
|
||||
$doc = $dbForProject->findOne('stats', [new Query("period", Query::TYPE_EQUAL, ["30m"])], 0, ['time'], [Database::ORDER_DESC]);
|
||||
$latestData[$id][$metric]["30m"] = $doc ? $doc->getAttribute('time') : null;
|
||||
}
|
||||
}
|
||||
$projects = null;
|
||||
return $latestData;
|
||||
}
|
||||
|
||||
function syncData($client, $projectId, &$latestData, $dbForProject)
|
||||
{
|
||||
foreach (['30m', '1d'] as $period) {
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period == '1d' ? '-90 days' : '-24 hours'))->format(DateTime::RFC3339);
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
$database = $client->selectDB('telegraf');
|
||||
$dbForProject->setNamespace("project_{$projectId}_internal");
|
||||
foreach (['requests', 'network', 'executions'] as $metric) {
|
||||
if (!empty($latestData[$projectId][$metric][$period])) {
|
||||
$start = DateTime::createFromFormat('U', $latestData[$projectId][$metric][$period])->format(DateTime::RFC3339);
|
||||
}
|
||||
syncMetric($database, $projectId, $period, 'requests', $start, $end, $dbForProject);
|
||||
}
|
||||
syncMetricPaths($database, $projectId, $period, $start, $end, $dbForProject);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
function syncMetricPaths($database, $projectId, $period, $start, $end, $dbForProject)
|
||||
{
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period == '1d' ? '-7 days' : '-24 hours'))->format(DateTime::RFC3339);
|
||||
if (!empty($latestData[$projectId]['database'][$period])) {
|
||||
$start = DateTime::createFromFormat('U', $latestData[$projectId]['database.reads'][$period])->format(DateTime::RFC3339);
|
||||
}
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_requests_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "project"=\'' . $projectId . '\'GROUP BY time(' . $period . '), "path", "method" FILL(null)');
|
||||
$points = $result->getPoints();
|
||||
|
||||
$databaseMetrics = [
|
||||
'creates' => [
|
||||
'method' => 'post',
|
||||
'paths' => [
|
||||
'/v1/database/collections' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/string' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/email' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/ip' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/url' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/integer' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/float' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/boolean' => true,
|
||||
'/v1/database/collections/*collectionId/indexes' => true,
|
||||
'/v1/database/collections/*collectionId/documents' => true,
|
||||
],
|
||||
],
|
||||
'reads' => [
|
||||
'method' => 'get',
|
||||
'paths' => [
|
||||
'/v1/database/collections' => true,
|
||||
'/v1/database/collections/*collectionId' => true,
|
||||
'/v1/database/collections/*collectionId/attributes' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/*attributeId' => true,
|
||||
'/v1/database/collections/*collectionId/indexes' => true,
|
||||
'/v1/database/collections/*collectionId/indexes/*indexId' => true,
|
||||
'/v1/database/collections/*collectionId/documents' => true,
|
||||
'/v1/database/collections/*collectionId/documents/*documentId' => true,
|
||||
],
|
||||
],
|
||||
'updates' => [
|
||||
'method' => 'put',
|
||||
'paths' => [
|
||||
'/v1/database/collections/*collectionId' => true,
|
||||
'/v1/database/collections/*collectionId/documents/*documentId' => true,
|
||||
],
|
||||
],
|
||||
'deletes' => [
|
||||
'method' => 'delete',
|
||||
'paths' => [
|
||||
'/v1/database/collections/*collectionId' => true,
|
||||
'/v1/database/collections/*collectionId/attributes/*attributeId' => true,
|
||||
'/v1/database/collections/*collectionId/indexes/*indexId' => true,
|
||||
'/v1/database/collections/*collectionId/documents/*documentId' => true,
|
||||
],
|
||||
],
|
||||
];
|
||||
|
||||
$dbStats = [];
|
||||
foreach ($points as $point) {
|
||||
$time = \strtotime($point['time']);
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
$path = $point['path'] ?? '';
|
||||
$method = $point['method'] ?? '';
|
||||
|
||||
foreach (['creates', 'reads', 'updates', 'deletes'] as $operation) {
|
||||
if ($method == $databaseMetrics[$operation]['method']
|
||||
&& array_key_exists($path, $databaseMetrics[$operation]['paths'])) {
|
||||
if (empty($dbStats["database.{$operation}"][$period][$time])) {
|
||||
$dbStats["database.{$operation}"][$period][$time] = 0;
|
||||
}
|
||||
$dbStats["database.{$operation}"][$period][$time] += $value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$time = \strtotime($start);
|
||||
foreach ($dbStats as $metric => $stats) {
|
||||
foreach ($stats as $period => $times) {
|
||||
foreach ($times as $time => $value) {
|
||||
$id = \md5($time . '_' . $period . '_' . $metric);
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $metric,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument('stats', $document->getId(),
|
||||
$document->setAttribute('value', $value));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
$latestData[$projectId]['database.reads'][$period] = $time;
|
||||
}
|
||||
|
||||
function syncMetric($database, $projectId, $period, $metric, $start, $end, $dbForProject)
|
||||
{
|
||||
$result = $database->query('SELECT sum(value) AS "value" FROM "appwrite_usage_' . $metric . '_all" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' AND "project"=\'' . $projectId . '\'GROUP BY time(' . $period . ') FILL(null)');
|
||||
$points = $result->getPoints();
|
||||
|
||||
foreach ($points as $point) {
|
||||
$time = \strtotime($point['time']);
|
||||
$id = \md5($time . '_' . $period . '_' . $metric);
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period,
|
||||
'time' => $time,
|
||||
'metric' => $metric,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument('stats', $document->getId(),
|
||||
$document->setAttribute('value', $value));
|
||||
}
|
||||
$latestData[$projectId][$metric][$period] = $time;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ class Stats
|
|||
$functionExecutionTime = $this->params['functionExecutionTime'] ?? 0;
|
||||
$functionStatus = $this->params['functionStatus'] ?? '';
|
||||
|
||||
$tags = ",project={$projectId},version=" . App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
$tags = ",projectId={$projectId},version=" . App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
// the global namespace is prepended to every key (optional)
|
||||
$this->statsd->setNamespace($this->namespace);
|
||||
|
@ -126,7 +126,7 @@ class Stats
|
|||
foreach ($dbMetrics as $metric) {
|
||||
$value = $this->params[$metric] ?? 0;
|
||||
if ($value >= 1) {
|
||||
$dbTags = ",project={$projectId},collectionId=" . ($this->params['collectionId'] ?? '');
|
||||
$dbTags = ",projectId={$projectId},collectionId=" . ($this->params['collectionId'] ?? '');
|
||||
$this->statsd->increment($metric . $dbTags);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue