1
0
Fork 0
mirror of synced 2024-06-29 03:30:34 +12:00

feat(realtime): shared usage stats over database

This commit is contained in:
Torsten Dittmann 2021-06-15 17:11:07 +02:00
parent 7adcdf916f
commit 4847a0b674
2 changed files with 113 additions and 16 deletions

View file

@ -1,6 +1,8 @@
<?php
use Appwrite\Extend\PDO;
use Appwrite\Realtime\Server;
use Utopia\App;
require_once __DIR__ . '/init.php';
@ -10,4 +12,28 @@ $config = [
'package_max_length' => 64000 // Default maximum Package Size (64kb)
];
$register->set('db', function () {
$dbHost = App::getEnv('_APP_DB_HOST', '');
$dbUser = App::getEnv('_APP_DB_USER', '');
$dbPass = App::getEnv('_APP_DB_PASS', '');
$dbScheme = App::getEnv('_APP_DB_SCHEMA', '');
$pdo = new PDO("mysql:host={$dbHost};dbname={$dbScheme};charset=utf8mb4", $dbUser, $dbPass, array(
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8mb4',
PDO::ATTR_TIMEOUT => 3, // Seconds
PDO::ATTR_PERSISTENT => true,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
));
return $pdo;
});
$register->set('cache', function () { // Register cache connection
$redis = new Redis();
$redis->pconnect(App::getEnv('_APP_REDIS_HOST', ''), App::getEnv('_APP_REDIS_PORT', ''));
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
return $redis;
});
$realtimeServer = new Server($register, config: $config);

View file

@ -5,6 +5,7 @@ namespace Appwrite\Realtime;
use Appwrite\Database\Database;
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
use Appwrite\Database\Adapter\Redis as RedisAdapter;
use Appwrite\Database\Validator\Authorization;
use Appwrite\Event\Event;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Response;
@ -42,10 +43,10 @@ class Server
public SwooleServer $server;
/**
* Container scoped Table.
* Container scoped Usage Table.
* @var Table
*/
public Table $stats;
public Table $usage;
/**
* Container scoped Database connection.
@ -77,12 +78,12 @@ class Server
$this->connections = [];
$this->register = $register;
$this->stats = new Table(4096, 1);
$this->stats->column('projectId', Table::TYPE_STRING, 64);
$this->stats->column('connections', Table::TYPE_INT);
$this->stats->column('connectionsTotal', Table::TYPE_INT);
$this->stats->column('messages', Table::TYPE_INT);
$this->stats->create();
$this->usage = new Table(4096, 1);
$this->usage->column('projectId', Table::TYPE_STRING, 64);
$this->usage->column('connections', Table::TYPE_INT);
$this->usage->column('connectionsTotal', Table::TYPE_INT);
$this->usage->column('messages', Table::TYPE_INT);
$this->usage->create();
$this->db = new Database();
$this->db->setAdapter(new RedisAdapter(new MySQLAdapter($this->register), $this->register));
@ -96,6 +97,7 @@ class Server
$this->server->on('open', [$this, 'onOpen']);
$this->server->on('message', [$this, 'onMessage']);
$this->server->on('close', [$this, 'onClose']);
$this->server->container_id = uniqid();
$this->server->start();
}
@ -110,9 +112,34 @@ class Server
Console::success('Server started succefully');
Console::info("Master pid {$server->master_pid}, manager pid {$server->manager_pid}");
try {
go(function() {
$document = [
'$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS,
'$permissions' => [
'read' => ['*'],
'write' => ['*'],
],
'container' => $this->server->container_id,
'timestamp' => time(),
'data' => '{}'
];
Authorization::disable();
$document = $this->db->createDocument($document);
Authorization::enable();
$this->server->document_id = $document->getId();
});
} catch (\Throwable $th) {
Console::error('[Error] Type: '.get_class($th));
Console::error('[Error] Message: '.$th->getMessage());
Console::error('[Error] File: '.$th->getFile());
Console::error('[Error] Line: '.$th->getLine());
}
Timer::tick(10000, function () {
/** @var Table $stats */
foreach ($this->stats as $projectId => $value) {
foreach ($this->usage as $projectId => $value) {
if (empty($value['connections']) && empty($value['messages'])) {
continue;
}
@ -128,7 +155,7 @@ class Server
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0);
$this->stats->set($projectId, [
$this->usage->set($projectId, [
'projectId' => $projectId,
'messages' => 0,
'connections' => 0
@ -140,6 +167,37 @@ class Server
}
});
Timer::tick(10000, function () {
$payload = [];
foreach ($this->usage as $projectId => $value) {
if (!empty($value['connectionsTotal'])) {
$payload[$projectId] = $value['connectionsTotal'];
}
}
if (empty($payload)){
return;
}
$document = [
'$id' => $this->server->document_id,
'$collection' => Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS,
'$permissions' => [
'read' => ['*'],
'write' => ['*'],
],
'container' => $this->server->container_id,
'timestamp' => time(),
'data' => json_encode($payload)
];
try {
$document = $this->db->updateDocument($document);
} catch (\Throwable $th) {
Console::error('[Error] Type: '.get_class($th));
Console::error('[Error] Message: '.$th->getMessage());
Console::error('[Error] File: '.$th->getFile());
Console::error('[Error] Line: '.$th->getLine());
}
});
Process::signal(2, function () use ($server) {
Console::log('Stop by Ctrl+C');
$server->shutdown();
@ -302,8 +360,8 @@ class Server
$server->push($connection, json_encode($channels));
$this->stats->incr($project->getId(), 'connections');
$this->stats->incr($project->getId(), 'connectionsTotal');
$this->usage->incr($project->getId(), 'connections');
$this->usage->incr($project->getId(), 'connectionsTotal');
} catch (\Throwable $th) {
$response = [
'code' => $th->getCode(),
@ -353,7 +411,7 @@ class Server
public function onClose(SwooleServer $server, int $connection)
{
if (array_key_exists($connection, $this->connections)) {
$this->stats->decr($this->connections[$connection]['projectId'], 'connectionsTotal');
$this->usage->decr($this->connections[$connection]['projectId'], 'connectionsTotal');
}
Parser::unsubscribe($connection, $this->subscriptions, $this->connections);
Console::info('Connection close: ' . $connection);
@ -405,7 +463,7 @@ class Server
}
}
if (($num = count($receivers)) > 0) {
$this->stats->incr($event['project'], 'messages', $num);
$this->usage->incr($event['project'], 'messages', $num);
}
}
@ -423,8 +481,21 @@ class Server
&& array_key_exists('project', $this->subscriptions['console']['role:member'])
) {
$payload = [];
foreach ($this->stats as $projectId => $value) {
$payload[$projectId] = $value['connectionsTotal'];
$list = $this->db->getCollection([
'filters' => [
'$collection='.Database::SYSTEM_COLLECTION_REALTIME_CONNECTIONS,
'timestamp>'.(time() - 15)
],
]);
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('data')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
foreach ($this->subscriptions['console']['role:member']['project'] as $connection => $value) {
$server->push(