2021-08-10 20:44:31 +12:00
< ? php
2021-08-11 22:15:00 +12:00
global $cli , $register ;
2021-08-10 20:44:31 +12:00
2021-08-13 20:39:46 +12:00
require_once __DIR__ . '/../init.php' ;
2021-08-10 20:44:31 +12:00
use Utopia\App ;
2021-08-13 20:39:46 +12:00
use Utopia\Cache\Adapter\Redis ;
2021-08-11 22:15:00 +12:00
use Utopia\Cache\Cache ;
2021-08-10 20:44:31 +12:00
use Utopia\CLI\Console ;
2021-08-11 22:15:00 +12:00
use Utopia\Database\Adapter\MariaDB ;
use Utopia\Database\Database ;
use Utopia\Database\Document ;
use Utopia\Database\Validator\Authorization ;
2021-08-10 20:44:31 +12:00
$cli
-> task ( 'usage' )
-> desc ( 'Schedules syncing data from influxdb to Appwrite console db' )
2021-08-11 22:15:00 +12:00
-> action ( function () use ( $register ) {
2021-08-10 20:44:31 +12:00
Console :: title ( 'Usage Sync V1' );
Console :: success ( APP_NAME . ' usage sync process v1 has started' );
2021-08-15 20:41:19 +12:00
$interval = ( int ) App :: getEnv ( '_APP_USAGE_AGGREGATION_INTERVAL' , '30' ); //30 seconds
2021-08-16 18:58:34 +12:00
$periods = [
[
'key' => '30m' ,
'startTime' => '-24 hours' ,
],
[
'key' => '1d' ,
'startTime' => '-90 days' ,
],
];
$globalMetrics = [
'requests' => [
'table' => 'appwrite_usage_requests_all' ,
],
'network' => [
'table' => 'appwrite_usage_network_all' ,
],
'executions' => [
'table' => 'appwrite_usage_executions_all' ,
],
'database.collections.create' => [
'table' => 'appwrite_usage_database_collections_create' ,
],
'database.collections.read' => [
'table' => 'appwrite_usage_database_collections_read' ,
],
'database.collections.update' => [
'table' => 'appwrite_usage_database_collections_update' ,
],
'database.collections.delete' => [
'table' => 'appwrite_usage_database_collections_delete' ,
],
'database.documents.create' => [
'table' => 'appwrite_usage_database_documents_create' ,
],
'database.documents.read' => [
'table' => 'appwrite_usage_database_documents_read' ,
],
'database.documents.update' => [
'table' => 'appwrite_usage_database_documents_update' ,
],
'database.documents.delete' => [
'table' => 'appwrite_usage_database_documents_delete' ,
],
2021-08-17 00:22:54 +12:00
'database.collections.collectionId.documents.create' => [
2021-08-16 18:58:34 +12:00
'table' => 'appwrite_usage_database_documents_create' ,
'groupBy' => 'collectionId' ,
],
2021-08-17 00:22:54 +12:00
'database.collections.collectionId.documents.read' => [
2021-08-16 18:58:34 +12:00
'table' => 'appwrite_usage_database_documents_read' ,
'groupBy' => 'collectionId' ,
],
2021-08-17 00:22:54 +12:00
'database.collections.collectionId.documents.update' => [
2021-08-16 18:58:34 +12:00
'table' => 'appwrite_usage_database_documents_update' ,
'groupBy' => 'collectionId' ,
],
2021-08-17 00:22:54 +12:00
'database.collections.collectionId.documents.delete' => [
2021-08-16 18:58:34 +12:00
'table' => 'appwrite_usage_database_documents_delete' ,
'groupBy' => 'collectionId' ,
],
2021-08-16 19:25:20 +12:00
'storage.buckets.bucketId.files.create' => [
'table' => 'appwrite_usage_storage_files_create' ,
'groupBy' => 'bucketId' ,
],
'storage.buckets.bucketId.files.read' => [
'table' => 'appwrite_usage_storage_files_read' ,
'groupBy' => 'bucketId' ,
],
'storage.buckets.bucketId.files.update' => [
'table' => 'appwrite_usage_storage_files_update' ,
'groupBy' => 'bucketId' ,
],
'storage.buckets.bucketId.files.delete' => [
'table' => 'appwrite_usage_storage_files_delete' ,
'groupBy' => 'bucketId' ,
],
2021-08-16 20:53:34 +12:00
'users.create' => [
'table' => 'appwrite_usage_users_create' ,
],
'users.read' => [
'table' => 'appwrite_usage_users_read' ,
],
'users.update' => [
'table' => 'appwrite_usage_users_update' ,
],
'users.delete' => [
'table' => 'appwrite_usage_users_delete' ,
],
'users.sessions.create' => [
'table' => 'appwrite_usage_users_sessions_create' ,
'groupBy' => 'provider' ,
],
'users.sessions.delete' => [
'table' => 'appwrite_usage_users_sessions_delete' ,
],
2021-08-16 18:58:34 +12:00
];
2021-08-13 21:45:46 +12:00
$attempts = 0 ;
$max = 10 ;
$sleep = 1 ;
2021-08-16 18:58:34 +12:00
do { // connect to db
2021-08-13 21:45:46 +12:00
try {
$attempts ++ ;
$db = $register -> get ( 'db' );
$redis = $register -> get ( 'cache' );
break ; // leave the do-while if successful
} catch ( \Exception $e ) {
Console :: warning ( " Database not ready. Retrying connection ( { $attempts } )... " );
if ( $attempts >= $max ) {
throw new \Exception ( 'Failed to connect to database: ' . $e -> getMessage ());
}
sleep ( $sleep );
}
} while ( $attempts < $max );
2021-08-10 20:44:31 +12:00
2021-08-15 20:38:31 +12:00
$cacheAdapter = new Cache ( new Redis ( $redis ));
2021-08-13 21:45:46 +12:00
$dbForProject = new Database ( new MariaDB ( $db ), $cacheAdapter );
2021-08-10 20:44:31 +12:00
2021-08-16 21:02:35 +12:00
$latestTime = [];
2021-08-11 22:15:00 +12:00
Authorization :: disable ();
2021-08-10 20:44:31 +12:00
2021-08-16 21:02:35 +12:00
Console :: loop ( function () use ( $interval , $register , $dbForProject , $globalMetrics , $periods , & $latestTime ) {
$now = date ( 'd-m-Y H:i:s' , time ());
Console :: info ( " [ { $now } ] Aggregating usage data every { $interval } seconds " );
2021-08-11 22:15:00 +12:00
2021-08-16 20:53:34 +12:00
$loopStart = microtime ( true );
2021-08-13 20:39:46 +12:00
$client = $register -> get ( 'influxdb' );
2021-08-12 17:53:02 +12:00
if ( $client ) {
2021-08-16 18:58:34 +12:00
$database = $client -> selectDB ( 'telegraf' );
// sync data
foreach ( $globalMetrics as $metric => $options ) {
foreach ( $periods as $period ) {
$start = DateTime :: createFromFormat ( 'U' , \strtotime ( $period [ 'startTime' ])) -> format ( DateTime :: RFC3339 );
2021-08-16 21:02:35 +12:00
if ( ! empty ( $latestTime [ $metric ][ $period [ 'key' ]])) {
$start = DateTime :: createFromFormat ( 'U' , $latestTime [ $metric ][ $period [ 'key' ]]) -> format ( DateTime :: RFC3339 );
}
2021-08-16 18:58:34 +12:00
$end = DateTime :: createFromFormat ( 'U' , \strtotime ( 'now' )) -> format ( DateTime :: RFC3339 );
$table = $options [ 'table' ];
$groupBy = $options [ 'groupBy' ] ? ? '' ;
$query = 'SELECT sum(value) AS "value" FROM "' . $table . '" WHERE time > \'' . $start . '\' AND time < \'' . $end . '\' AND "metric_type"=\'counter\' GROUP BY time(' . $period [ 'key' ] . '), "projectId"' . ( empty ( $groupBy ) ? '' : ', "' . $groupBy . '"' ) . ' FILL(null)' ;
$result = $database -> query ( $query );
$points = $result -> getPoints ();
foreach ( $points as $point ) {
$projectId = $point [ 'projectId' ];
if ( ! empty ( $projectId ) && $projectId != 'console' ) {
$dbForProject -> setNamespace ( 'project_' . $projectId . '_internal' );
if ( ! empty ( $groupBy )) {
$groupedBy = $point [ $groupBy ];
if ( empty ( $groupedBy )) {
continue ;
}
$metric = str_replace ( $groupBy , $groupedBy , $metric );
}
$time = \strtotime ( $point [ 'time' ]);
$id = \md5 ( $time . '_' . $period [ 'key' ] . '_' . $metric );
$value = ( ! empty ( $point [ 'value' ])) ? $point [ 'value' ] : 0 ;
try {
$document = $dbForProject -> getDocument ( 'stats' , $id );
if ( $document -> isEmpty ()) {
$dbForProject -> createDocument ( 'stats' , new Document ([
'$id' => $id ,
'period' => $period [ 'key' ],
'time' => $time ,
'metric' => $metric ,
'value' => $value ,
'type' => 0 ,
]));
} else {
$dbForProject -> updateDocument ( 'stats' , $document -> getId (),
$document -> setAttribute ( 'value' , $value ));
}
2021-08-16 21:02:35 +12:00
$latestTime [ $metric ][ $period [ 'key' ]] = $time ;
2021-08-16 18:58:34 +12:00
} catch ( \Exception $e ) {
Console :: warning ( " Failed to save data for project { $projectId } and metric { $metric } " );
}
}
}
}
2021-08-11 22:15:00 +12:00
}
}
2021-08-16 20:53:34 +12:00
$loopTook = microtime ( true ) - $loopStart ;
2021-08-16 21:02:35 +12:00
$now = date ( 'd-m-Y H:i:s' , time ());
Console :: info ( " [ { $now } ] Aggregation took { $loopTook } seconds " );
2021-08-11 22:15:00 +12:00
}, $interval );
2021-08-10 20:44:31 +12:00
});