Add dynamic queue setting for database worker
This commit is contained in:
parent
2cf0ed1d69
commit
ca5b7f5b16
|
@ -4,6 +4,7 @@ use Appwrite\Event\Event;
|
||||||
use Appwrite\Extend\Exception;
|
use Appwrite\Extend\Exception;
|
||||||
use Appwrite\Messaging\Adapter\Realtime;
|
use Appwrite\Messaging\Adapter\Realtime;
|
||||||
use Appwrite\Resque\Worker;
|
use Appwrite\Resque\Worker;
|
||||||
|
use Utopia\App;
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Database\Database;
|
use Utopia\Database\Database;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
|
@ -14,10 +15,43 @@ require_once __DIR__ . '/../init.php';
|
||||||
Console::title('Database V1 Worker');
|
Console::title('Database V1 Worker');
|
||||||
Console::success(APP_NAME . ' database worker v1 has started' . "\n");
|
Console::success(APP_NAME . ' database worker v1 has started' . "\n");
|
||||||
|
|
||||||
|
$table = new Swoole\Table(1);
|
||||||
|
$table->column('workerCount', Swoole\Table::TYPE_INT);
|
||||||
|
$table->create();
|
||||||
|
$table->set('databases', ['workerCount' => 0]);
|
||||||
|
|
||||||
|
$lock = new Swoole\Lock(SWOOLE_MUTEX);
|
||||||
|
|
||||||
class DatabaseV1 extends Worker
|
class DatabaseV1 extends Worker
|
||||||
{
|
{
|
||||||
public function init(): void
|
public function init(): void
|
||||||
{
|
{
|
||||||
|
global $table, $lock;
|
||||||
|
|
||||||
|
$dbQueues = App::getEnv('_APP_CONNECTIONS_DB_QUEUES');
|
||||||
|
|
||||||
|
if (empty($dbQueues)) {
|
||||||
|
$queue = 'v1-database';
|
||||||
|
} elseif (\str_contains($dbQueues, ',')) {
|
||||||
|
$dbQueues = \explode(',', $dbQueues);
|
||||||
|
$dbQueues = \array_map('trim', $dbQueues);
|
||||||
|
$dbQueues = \array_filter($dbQueues);
|
||||||
|
$dbQueues = \array_values($dbQueues);
|
||||||
|
|
||||||
|
$count = $table->get('databases', 'workerCount');
|
||||||
|
|
||||||
|
Console::log('Database worker count: ' . $count);
|
||||||
|
|
||||||
|
$queue = $dbQueues[$count];
|
||||||
|
} else {
|
||||||
|
$queue = \trim($dbQueues);
|
||||||
|
}
|
||||||
|
|
||||||
|
\putenv('QUEUE=' . $queue);
|
||||||
|
|
||||||
|
$lock->lock();
|
||||||
|
$table->incr('databases', 'workerCount');
|
||||||
|
$lock->unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function run(): void
|
public function run(): void
|
||||||
|
@ -58,6 +92,11 @@ class DatabaseV1 extends Worker
|
||||||
|
|
||||||
public function shutdown(): void
|
public function shutdown(): void
|
||||||
{
|
{
|
||||||
|
global $table, $lock;
|
||||||
|
|
||||||
|
$lock->lock();
|
||||||
|
$table->decr('databases', 'workerCount');
|
||||||
|
$lock->unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue