add usage stats for realtime
This commit is contained in:
parent
30506c9b00
commit
c5ad66839d
|
@ -378,7 +378,7 @@ App::setResource('user', function($mode, $project, $console, $request, $response
|
|||
/** @var Appwrite\Database\Document $project */
|
||||
/** @var Appwrite\Database\Database $consoleDB */
|
||||
/** @var Appwrite\Database\Database $projectDB */
|
||||
/** @var bool $mode */
|
||||
/** @var string $mode */
|
||||
|
||||
Authorization::setDefaultStatus(true);
|
||||
|
||||
|
|
|
@ -4,12 +4,15 @@ require_once __DIR__ . '/init.php';
|
|||
|
||||
use Appwrite\Database\Pool\PDOPool;
|
||||
use Appwrite\Database\Pool\RedisPool;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Network\Validator\Origin;
|
||||
use Appwrite\Realtime\Realtime;
|
||||
use Appwrite\Utopia\Response;
|
||||
use Swoole\Process;
|
||||
use Swoole\Http\Request;
|
||||
use Swoole\Http\Response as SwooleResponse;
|
||||
use Swoole\Process;
|
||||
use Swoole\Table;
|
||||
use Swoole\Timer;
|
||||
use Swoole\WebSocket\Frame;
|
||||
use Swoole\WebSocket\Server;
|
||||
use Utopia\App;
|
||||
|
@ -38,7 +41,46 @@ $server->set([
|
|||
$subscriptions = [];
|
||||
$connections = [];
|
||||
|
||||
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) {
|
||||
$stats = new Table(4096, 1);
|
||||
$stats->column('projectId', Table::TYPE_STRING, 64);
|
||||
$stats->column('connections', Table::TYPE_INT);
|
||||
$stats->column('messages', Table::TYPE_INT);
|
||||
$stats->create();
|
||||
|
||||
/**
|
||||
* Sends usage stats every 10 seconds.
|
||||
*/
|
||||
Timer::tick(10000, function () use (&$stats) {
|
||||
/** @var Table $stats */
|
||||
foreach ($stats as $projectId => $value) {
|
||||
if (empty($value['connections']) && empty($value['messages'])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$connections = $value['connections'];
|
||||
$messages = $value['messages'];
|
||||
|
||||
$usage = new Event('v1-usage', 'UsageV1');
|
||||
$usage
|
||||
->setParam('projectId', $projectId)
|
||||
->setParam('realtimeConnections', $connections)
|
||||
->setParam('realtimeMessages', $messages)
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
$stats->set($projectId, array(
|
||||
'projectId' => $projectId,
|
||||
'messages' => 0,
|
||||
'connections' => 0
|
||||
));
|
||||
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$register, &$stats) {
|
||||
Console::success('Worker ' . $workerId . ' started succefully');
|
||||
|
||||
$attempts = 0;
|
||||
|
@ -63,7 +105,7 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &
|
|||
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
|
||||
}
|
||||
|
||||
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$subscriptions) {
|
||||
$redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$subscriptions, &$stats) {
|
||||
/**
|
||||
* Supported Resources:
|
||||
* - Collection
|
||||
|
@ -98,6 +140,9 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &
|
|||
$server->close($receiver);
|
||||
}
|
||||
}
|
||||
if (($num = count($receivers)) > 0) {
|
||||
$stats->incr($event['project'], 'messages', $num);
|
||||
}
|
||||
});
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||
|
@ -124,7 +169,7 @@ $server->on('start', function (Server $server) {
|
|||
});
|
||||
});
|
||||
|
||||
$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) {
|
||||
$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register, &$stats) {
|
||||
$app = new App('UTC');
|
||||
$connection = $request->fd;
|
||||
$request = new SwooleRequest($request);
|
||||
|
@ -135,11 +180,11 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
|
|||
$register->set('db', function () use (&$db) {
|
||||
return $db;
|
||||
});
|
||||
|
||||
|
||||
$register->set('cache', function () use (&$redis) { // Register cache connection
|
||||
return $redis;
|
||||
});
|
||||
|
||||
|
||||
Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})");
|
||||
|
||||
App::setResource('request', function () use ($request) {
|
||||
|
@ -213,6 +258,8 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio
|
|||
Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels);
|
||||
|
||||
$server->push($connection, json_encode($channels));
|
||||
|
||||
$stats->incr($project->getId(), 'connections');
|
||||
} catch (\Throwable $th) {
|
||||
$response = [
|
||||
'code' => $th->getCode(),
|
||||
|
|
|
@ -141,6 +141,7 @@ services:
|
|||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_USAGE_STATS
|
||||
|
||||
appwrite-worker-usage:
|
||||
image: appwrite/appwrite:<?php echo $version."\n"; ?>
|
||||
|
|
|
@ -25,12 +25,13 @@ class UsageV1 extends Worker
|
|||
{
|
||||
global $register;
|
||||
|
||||
/** @var \Domnikl\Statsd\Client $statsd */
|
||||
$statsd = $register->get('statsd', true);
|
||||
|
||||
$projectId = $this->args['projectId'];
|
||||
|
||||
$networkRequestSize = $this->args['networkRequestSize'];
|
||||
$networkResponseSize = $this->args['networkResponseSize'];
|
||||
$networkRequestSize = $this->args['networkRequestSize'] ?? 0;
|
||||
$networkResponseSize = $this->args['networkResponseSize'] ?? 0;
|
||||
|
||||
$storage = $this->args['storage'] ?? null;
|
||||
|
||||
|
@ -42,7 +43,10 @@ class UsageV1 extends Worker
|
|||
$functionExecutionTime = $this->args['functionExecutionTime'] ?? null;
|
||||
$functionStatus = $this->args['functionStatus'] ?? null;
|
||||
|
||||
$tags = ",project={$projectId},version=".App::getEnv('_APP_VERSION', 'UNKNOWN').'';
|
||||
$realtimeConnections = $this->args['realtimeConnections'] ?? null;
|
||||
$realtimeMessages = $this->args['realtimeMessages'] ?? null;
|
||||
|
||||
$tags = ",project={$projectId},version=".App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
// the global namespace is prepended to every key (optional)
|
||||
$statsd->setNamespace('appwrite.usage');
|
||||
|
@ -53,10 +57,17 @@ class UsageV1 extends Worker
|
|||
|
||||
if($functionExecution >= 1) {
|
||||
$statsd->increment('executions.all'.$tags.',functionId='.$functionId.',functionStatus='.$functionStatus);
|
||||
var_dump($tags.',functionId='.$functionId.',functionStatus='.$functionStatus);
|
||||
$statsd->count('executions.time'.$tags.',functionId='.$functionId, $functionExecutionTime);
|
||||
}
|
||||
|
||||
if($realtimeConnections >= 1) {
|
||||
$statsd->count('realtime.clients'.$tags, $realtimeConnections);
|
||||
}
|
||||
|
||||
if($realtimeMessages >= 1) {
|
||||
$statsd->count('realtime.message'.$tags, $realtimeMessages);
|
||||
}
|
||||
|
||||
$statsd->count('network.inbound'.$tags, $networkRequestSize);
|
||||
$statsd->count('network.outbound'.$tags, $networkResponseSize);
|
||||
$statsd->count('network.all'.$tags, $networkRequestSize + $networkResponseSize);
|
||||
|
|
|
@ -161,6 +161,7 @@ services:
|
|||
- _APP_DB_SCHEMA
|
||||
- _APP_DB_USER
|
||||
- _APP_DB_PASS
|
||||
- _APP_USAGE_STATS
|
||||
|
||||
appwrite-worker-usage:
|
||||
entrypoint: worker-usage
|
||||
|
|
Loading…
Reference in a new issue