fix after rebase
This commit is contained in:
parent
81cf991695
commit
b9d95f769c
140
app/realtime.php
140
app/realtime.php
|
@ -4,6 +4,7 @@ use Appwrite\Auth\Auth;
|
||||||
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
use Appwrite\Database\Adapter\Redis as RedisAdapter;
|
||||||
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
use Appwrite\Database\Adapter\MySQL as MySQLAdapter;
|
||||||
use Appwrite\Database\Database;
|
use Appwrite\Database\Database;
|
||||||
|
use Appwrite\Database\Validator\Authorization;
|
||||||
use Appwrite\Event\Event;
|
use Appwrite\Event\Event;
|
||||||
use Appwrite\Messaging\Adapter\Realtime;
|
use Appwrite\Messaging\Adapter\Realtime;
|
||||||
use Appwrite\Network\Validator\Origin;
|
use Appwrite\Network\Validator\Origin;
|
||||||
|
@ -39,14 +40,70 @@ $stats->column('connectionsTotal', Table::TYPE_INT);
|
||||||
$stats->column('messages', Table::TYPE_INT);
|
$stats->column('messages', Table::TYPE_INT);
|
||||||
$stats->create();
|
$stats->create();
|
||||||
|
|
||||||
|
$containerId = uniqid();
|
||||||
|
$documentId = null;
|
||||||
|
|
||||||
$server = new Server($adapter);
|
$server = new Server($adapter);
|
||||||
|
|
||||||
$realtime = new Realtime();
|
$realtime = new Realtime();
|
||||||
|
|
||||||
$server->onStart(function () use ($stats) {
|
$server->onStart(function () use ($stats, $register, $containerId, &$documentId) {
|
||||||
Console::success('Server started succefully');
|
Console::success('Server started succefully');
|
||||||
|
|
||||||
Timer::tick(10000, function () use ($stats) {
|
$getConsoleDb = function () use ($register) {
|
||||||
|
$db = $register->get('dbPool')->get();
|
||||||
|
$cache = $register->get('redisPool')->get();
|
||||||
|
|
||||||
|
$consoleDb = new Database();
|
||||||
|
$consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||||
|
$consoleDb->setNamespace('app_console');
|
||||||
|
$consoleDb->setMocks(Config::getParam('collections', []));
|
||||||
|
|
||||||
|
return [
|
||||||
|
$consoleDb,
|
||||||
|
function () use ($register, $db, $cache) {
|
||||||
|
$register->get('dbPool')->put($db);
|
||||||
|
$register->get('redisPool')->put($cache);
|
||||||
|
}
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create document for this worker for connection stats across Containers.
|
||||||
|
*/
|
||||||
|
go(function () use ($getConsoleDb, $containerId, &$documentId) {
|
||||||
|
try {
|
||||||
|
[$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb);
|
||||||
|
$document = [
|
||||||
|
'$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||||
|
'$permissions' => [
|
||||||
|
'read' => ['*'],
|
||||||
|
'write' => ['*'],
|
||||||
|
],
|
||||||
|
'container' => $containerId,
|
||||||
|
'timestamp' => time(),
|
||||||
|
'value' => '{}'
|
||||||
|
];
|
||||||
|
Authorization::disable();
|
||||||
|
$document = $consoleDb->createDocument($document);
|
||||||
|
Authorization::enable();
|
||||||
|
$documentId = $document->getId();
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error('[Error] Type: ' . get_class($th));
|
||||||
|
Console::error('[Error] Message: ' . $th->getMessage());
|
||||||
|
Console::error('[Error] File: ' . $th->getFile());
|
||||||
|
Console::error('[Error] Line: ' . $th->getLine());
|
||||||
|
} finally {
|
||||||
|
call_user_func($returnConsoleDb);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save current connections to the Database every 5 seconds.
|
||||||
|
*/
|
||||||
|
Timer::tick(5000, function () use ($stats, $getConsoleDb, $containerId, &$documentId) {
|
||||||
|
[$consoleDb, $returnConsoleDb] = call_user_func($getConsoleDb);
|
||||||
|
|
||||||
foreach ($stats as $projectId => $value) {
|
foreach ($stats as $projectId => $value) {
|
||||||
if (empty($value['connections']) && empty($value['messages'])) {
|
if (empty($value['connections']) && empty($value['messages'])) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -73,6 +130,36 @@ $server->onStart(function () use ($stats) {
|
||||||
$usage->trigger();
|
$usage->trigger();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
$payload = [];
|
||||||
|
foreach ($stats as $projectId => $value) {
|
||||||
|
if (!empty($value['connectionsTotal'])) {
|
||||||
|
$payload[$projectId] = $value['connectionsTotal'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (empty($payload)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
$document = [
|
||||||
|
'$id' => $documentId,
|
||||||
|
'$collection' => Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||||
|
'$permissions' => [
|
||||||
|
'read' => ['*'],
|
||||||
|
'write' => ['*'],
|
||||||
|
],
|
||||||
|
'container' => $containerId,
|
||||||
|
'timestamp' => time(),
|
||||||
|
'value' => json_encode($payload)
|
||||||
|
];
|
||||||
|
try {
|
||||||
|
$document = $consoleDb->updateDocument($document);
|
||||||
|
} catch (\Throwable $th) {
|
||||||
|
Console::error('[Error] Type: ' . get_class($th));
|
||||||
|
Console::error('[Error] Message: ' . $th->getMessage());
|
||||||
|
Console::error('[Error] File: ' . $th->getFile());
|
||||||
|
Console::error('[Error] Line: ' . $th->getLine());
|
||||||
|
} finally {
|
||||||
|
call_user_func($returnConsoleDb);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -81,27 +168,56 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||||
|
|
||||||
$attempts = 0;
|
$attempts = 0;
|
||||||
$start = time();
|
$start = time();
|
||||||
$redisPool = $register->get('redisPool');
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sending current connections to project channels on the console project every 5 seconds.
|
* Sending current connections to project channels on the console project every 5 seconds.
|
||||||
*/
|
*/
|
||||||
Timer::tick(5000, function () use ($server, $stats, $realtime) {
|
Timer::tick(5000, function () use ($server, $register, $realtime) {
|
||||||
if ($realtime->hasSubscriber('console', 'role:member', 'project')) {
|
if ($realtime->hasSubscriber('console', 'role:member', 'project')) {
|
||||||
|
$db = $register->get('dbPool')->get();
|
||||||
|
$cache = $register->get('redisPool')->get();
|
||||||
|
|
||||||
|
$consoleDb = new Database();
|
||||||
|
$consoleDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||||
|
$consoleDb->setNamespace('app_console');
|
||||||
|
$consoleDb->setMocks(Config::getParam('collections', []));
|
||||||
|
|
||||||
|
$projectDb = new Database();
|
||||||
|
$projectDb->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||||
|
$projectDb->setMocks(Config::getParam('collections', []));
|
||||||
|
|
||||||
$payload = [];
|
$payload = [];
|
||||||
foreach ($stats as $projectId => $value) {
|
$list = $consoleDb->getCollection([
|
||||||
$payload[$projectId] = $value['connectionsTotal'];
|
'filters' => [
|
||||||
|
'$collection=' . Database::SYSTEM_COLLECTION_CONNECTIONS,
|
||||||
|
'timestamp>' . (time() - 15)
|
||||||
|
],
|
||||||
|
]);
|
||||||
|
|
||||||
|
foreach ($list as $document) {
|
||||||
|
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
|
||||||
|
if (array_key_exists($projectId, $payload)) {
|
||||||
|
$payload[$projectId] += $value;
|
||||||
|
} else {
|
||||||
|
$payload[$projectId] = $value;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$event = [
|
$event = [
|
||||||
'event' => 'stats.connections',
|
'project' => 'console',
|
||||||
'channels' => ['project'],
|
|
||||||
'permissions' => ['role:member'],
|
'permissions' => ['role:member'],
|
||||||
'timestamp' => time(),
|
'data' => [
|
||||||
'payload' => $payload
|
'event' => 'stats.connections',
|
||||||
|
'channels' => ['project'],
|
||||||
|
'timestamp' => time(),
|
||||||
|
'payload' => $payload
|
||||||
|
]
|
||||||
];
|
];
|
||||||
|
|
||||||
$server->send($realtime->getReceivers($event), json_encode($event));
|
$server->send($realtime->getReceivers($event), json_encode($event));
|
||||||
|
$register->get('dbPool')->put($db);
|
||||||
|
$register->get('redisPool')->put($cache);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -115,7 +231,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||||
$start = time();
|
$start = time();
|
||||||
|
|
||||||
/** @var Redis $redis */
|
/** @var Redis $redis */
|
||||||
$redis = $redisPool->get();
|
$redis = $register->get('redisPool')->get();
|
||||||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
|
||||||
|
|
||||||
if ($redis->ping(true)) {
|
if ($redis->ping(true)) {
|
||||||
|
@ -177,7 +293,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
|
||||||
});
|
});
|
||||||
} catch (\Throwable $th) {
|
} catch (\Throwable $th) {
|
||||||
Console::error('Pub/sub error: ' . $th->getMessage());
|
Console::error('Pub/sub error: ' . $th->getMessage());
|
||||||
$redisPool->put($redis);
|
$register->get('redisPool')->put($redis);
|
||||||
$attempts++;
|
$attempts++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue