seems client check is not required
This commit is contained in:
parent
e479df276f
commit
03c94627ad
1 changed files with 80 additions and 80 deletions
|
@ -236,6 +236,8 @@ $cli
|
|||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
$db = null;
|
||||
$redis = null;
|
||||
do { // connect to db
|
||||
try {
|
||||
$attempts++;
|
||||
|
@ -275,97 +277,95 @@ $cli
|
|||
* @var InfluxDB\Client $client
|
||||
*/
|
||||
$client = $register->get('influxdb');
|
||||
if ($client) {
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
$attempts = 0;
|
||||
$max = 10;
|
||||
$sleep = 1;
|
||||
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if(in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} catch (\Throwable $th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
do { // check if telegraf database is ready
|
||||
try {
|
||||
$attempts++;
|
||||
$database = $client->selectDB('telegraf');
|
||||
if(in_array('telegraf', $client->listDatabases())) {
|
||||
break; // leave the do-while if successful
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
} catch (\Throwable $th) {
|
||||
Console::warning("InfluxDB not ready. Retrying connection ({$attempts})...");
|
||||
if ($attempts >= $max) {
|
||||
throw new \Exception('InfluxDB database not ready yet');
|
||||
}
|
||||
sleep($sleep);
|
||||
}
|
||||
} while ($attempts < $max);
|
||||
|
||||
// sync data
|
||||
foreach ($globalMetrics as $metric => $options) { //for each metrics
|
||||
foreach ($periods as $period) { // aggregate data for each period
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
|
||||
if (!empty($latestTime[$metric][$period['key']])) {
|
||||
$start = DateTime::createFromFormat('U', $latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
|
||||
}
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
// sync data
|
||||
foreach ($globalMetrics as $metric => $options) { //for each metrics
|
||||
foreach ($periods as $period) { // aggregate data for each period
|
||||
$start = DateTime::createFromFormat('U', \strtotime($period['startTime']))->format(DateTime::RFC3339);
|
||||
if (!empty($latestTime[$metric][$period['key']])) {
|
||||
$start = DateTime::createFromFormat('U', $latestTime[$metric][$period['key']])->format(DateTime::RFC3339);
|
||||
}
|
||||
$end = DateTime::createFromFormat('U', \strtotime('now'))->format(DateTime::RFC3339);
|
||||
|
||||
$table = $options['table']; //Which influxdb table to query for this metric
|
||||
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
|
||||
$table = $options['table']; //Which influxdb table to query for this metric
|
||||
$groupBy = empty($options['groupBy']) ? '' : ', "' . $options['groupBy'] . '"'; //Some sub level metrics may be grouped by other tags like collectionId, bucketId, etc
|
||||
|
||||
$filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status
|
||||
if (!empty($filters)) {
|
||||
$filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters)));
|
||||
} else {
|
||||
$filters = '';
|
||||
}
|
||||
$filters = $options['filters'] ?? []; // Some metrics might have additional filters, like function's status
|
||||
if (!empty($filters)) {
|
||||
$filters = ' AND ' . implode(' AND ', array_map(fn ($filter, $value) => "\"{$filter}\"='{$value}'", array_keys($filters), array_values($filters)));
|
||||
} else {
|
||||
$filters = '';
|
||||
}
|
||||
|
||||
$query = "SELECT sum(value) AS \"value\" FROM \"{$table}\" WHERE \"time\" > '{$start}' AND \"time\" < '{$end}' AND \"metric_type\"='counter' {$filters} GROUP BY time({$period['key']}), \"projectId\" {$groupBy} FILL(null)";
|
||||
try {
|
||||
$result = $database->query($query);
|
||||
|
||||
$points = $result->getPoints();
|
||||
foreach ($points as $point) {
|
||||
$projectId = $point['projectId'];
|
||||
|
||||
if (!empty($projectId) && $projectId !== 'console') {
|
||||
$dbForProject->setNamespace('_project_' . $projectId);
|
||||
$metricUpdated = $metric;
|
||||
|
||||
if (!empty($groupBy)) {
|
||||
$groupedBy = $point[$options['groupBy']] ?? '';
|
||||
if (empty($groupedBy)) {
|
||||
continue;
|
||||
}
|
||||
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
|
||||
$query = "SELECT sum(value) AS \"value\" FROM \"{$table}\" WHERE \"time\" > '{$start}' AND \"time\" < '{$end}' AND \"metric_type\"='counter' {$filters} GROUP BY time({$period['key']}), \"projectId\" {$groupBy} FILL(null)";
|
||||
try {
|
||||
$result = $database->query($query);
|
||||
|
||||
$points = $result->getPoints();
|
||||
foreach ($points as $point) {
|
||||
$projectId = $point['projectId'];
|
||||
|
||||
if (!empty($projectId) && $projectId !== 'console') {
|
||||
$dbForProject->setNamespace('_project_' . $projectId);
|
||||
$metricUpdated = $metric;
|
||||
|
||||
if (!empty($groupBy)) {
|
||||
$groupedBy = $point[$options['groupBy']] ?? '';
|
||||
if (empty($groupedBy)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$time = \strtotime($point['time']);
|
||||
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //Construct unique id for each metric using time, period and metric
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
|
||||
try {
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period['key'],
|
||||
'time' => $time,
|
||||
'metric' => $metricUpdated,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
);
|
||||
}
|
||||
$latestTime[$metric][$period['key']] = $time;
|
||||
} catch (\Exception $e) { // if projects are deleted this might fail
|
||||
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}");
|
||||
$metricUpdated = str_replace($options['groupBy'], $groupedBy, $metric);
|
||||
}
|
||||
|
||||
$time = \strtotime($point['time']);
|
||||
$id = \md5($time . '_' . $period['key'] . '_' . $metricUpdated); //Construct unique id for each metric using time, period and metric
|
||||
$value = (!empty($point['value'])) ? $point['value'] : 0;
|
||||
|
||||
try {
|
||||
$document = $dbForProject->getDocument('stats', $id);
|
||||
if ($document->isEmpty()) {
|
||||
$dbForProject->createDocument('stats', new Document([
|
||||
'$id' => $id,
|
||||
'period' => $period['key'],
|
||||
'time' => $time,
|
||||
'metric' => $metricUpdated,
|
||||
'value' => $value,
|
||||
'type' => 0,
|
||||
]));
|
||||
} else {
|
||||
$dbForProject->updateDocument(
|
||||
'stats',
|
||||
$document->getId(),
|
||||
$document->setAttribute('value', $value)
|
||||
);
|
||||
}
|
||||
$latestTime[$metric][$period['key']] = $time;
|
||||
} catch (\Exception $e) { // if projects are deleted this might fail
|
||||
Console::warning("Failed to save data for project {$projectId} and metric {$metricUpdated}: {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Failed to Query: {$e->getMessage()}");
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::warning("Failed to Query: {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue