Merge remote-tracking branch 'origin/refactor-workers' into refactor-workers
This commit is contained in:
commit
66c57c9a4f
7 changed files with 30 additions and 16 deletions
|
@ -332,6 +332,8 @@ services:
|
||||||
- mariadb
|
- mariadb
|
||||||
environment:
|
environment:
|
||||||
- _APP_ENV
|
- _APP_ENV
|
||||||
|
- _APP_WORKERS_NUM=1
|
||||||
|
- _APP_QUEUE_NAME=database_db_main
|
||||||
- _APP_WORKER_PER_CORE
|
- _APP_WORKER_PER_CORE
|
||||||
- _APP_OPENSSL_KEY_V1
|
- _APP_OPENSSL_KEY_V1
|
||||||
- _APP_REDIS_HOST
|
- _APP_REDIS_HOST
|
||||||
|
|
|
@ -18,13 +18,11 @@ use Swoole\Runtime;
|
||||||
use Utopia\App;
|
use Utopia\App;
|
||||||
use Utopia\Cache\Adapter\Sharding;
|
use Utopia\Cache\Adapter\Sharding;
|
||||||
use Utopia\Cache\Cache;
|
use Utopia\Cache\Cache;
|
||||||
use Utopia\CLI\CLI;
|
|
||||||
use Utopia\CLI\Console;
|
use Utopia\CLI\Console;
|
||||||
use Utopia\Config\Config;
|
use Utopia\Config\Config;
|
||||||
use Utopia\Database\Database;
|
use Utopia\Database\Database;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
use Utopia\Database\Validator\Authorization;
|
use Utopia\Database\Validator\Authorization;
|
||||||
use Utopia\Queue\Adapter\Swoole;
|
|
||||||
use Utopia\Platform\Service;
|
use Utopia\Platform\Service;
|
||||||
use Utopia\Queue\Message;
|
use Utopia\Queue\Message;
|
||||||
use Utopia\Queue\Server;
|
use Utopia\Queue\Server;
|
||||||
|
@ -134,9 +132,6 @@ Server::setResource('queueForMails', function (Connection $queue) {
|
||||||
Server::setResource('queueForBuilds', function (Connection $queue) {
|
Server::setResource('queueForBuilds', function (Connection $queue) {
|
||||||
return new Build($queue);
|
return new Build($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
Server::setResource('queueForDatabase', function (Connection $queue) {
|
|
||||||
return new EventDatabase($queue);
|
|
||||||
}, ['queue']);
|
|
||||||
Server::setResource('queueForDeletes', function (Connection $queue) {
|
Server::setResource('queueForDeletes', function (Connection $queue) {
|
||||||
return new Delete($queue);
|
return new Delete($queue);
|
||||||
}, ['queue']);
|
}, ['queue']);
|
||||||
|
@ -216,17 +211,31 @@ $pools = $register->get('pools');
|
||||||
$platform = new Appwrite();
|
$platform = new Appwrite();
|
||||||
$args = $_SERVER['argv'];
|
$args = $_SERVER['argv'];
|
||||||
|
|
||||||
if (isset($args[0])) {
|
if (!isset($args[1])) {
|
||||||
$workerName = end($args);
|
|
||||||
} else {
|
|
||||||
Console::error('Missing worker name');
|
Console::error('Missing worker name');
|
||||||
|
Console::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
\array_shift($args);
|
||||||
|
$workerName = $args[0];
|
||||||
|
$workerIndex = $args[1] ?? '';
|
||||||
|
|
||||||
|
if (!empty($workerNum)) {
|
||||||
|
$workerName .= '_' . $workerIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
/**
|
||||||
|
* Any worker can be configured with the following env vars:
|
||||||
|
* - _APP_WORKERS_NUM The total number of worker processes
|
||||||
|
* - _APP_WORKER_PER_CORE The number of worker processes per core (ignored if _APP_WORKERS_NUM is set)
|
||||||
|
* - _APP_QUEUE_NAME The name of the queue to read for database events
|
||||||
|
*/
|
||||||
$platform->init(Service::TYPE_WORKER, [
|
$platform->init(Service::TYPE_WORKER, [
|
||||||
'workersNum' => strtolower($workerName) === 'databases'? 1 :swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)),
|
'workersNum' => App::getEnv('_APP_WORKERS_NUM', swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6))),
|
||||||
'connection' => $pools->get('queue')->pop()->getResource(),
|
'connection' => $pools->get('queue')->pop()->getResource(),
|
||||||
'workerName' => strtolower($workerName) ?? null,
|
'workerName' => strtolower($workerName) ?? null,
|
||||||
|
'queueName' => App::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName))
|
||||||
]);
|
]);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine());
|
Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine());
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
php /usr/src/code/app/worker.php databases $@
|
php /usr/src/code/app/worker.php databases $@
|
||||||
|
|
|
@ -56,7 +56,7 @@
|
||||||
"utopia-php/image": "0.5.*",
|
"utopia-php/image": "0.5.*",
|
||||||
"utopia-php/locale": "0.4.*",
|
"utopia-php/locale": "0.4.*",
|
||||||
"utopia-php/logger": "0.3.*",
|
"utopia-php/logger": "0.3.*",
|
||||||
"utopia-php/messaging": "0.1.*",
|
"utopia-php/messaging": "0.2.*",
|
||||||
"utopia-php/migration": "0.3.*",
|
"utopia-php/migration": "0.3.*",
|
||||||
"utopia-php/orchestration": "0.9.*",
|
"utopia-php/orchestration": "0.9.*",
|
||||||
"utopia-php/platform": "dev-integrate-workers as 0.3.3",
|
"utopia-php/platform": "dev-integrate-workers as 0.3.3",
|
||||||
|
@ -64,7 +64,7 @@
|
||||||
"utopia-php/preloader": "0.2.*",
|
"utopia-php/preloader": "0.2.*",
|
||||||
"utopia-php/queue": "dev-feat-get-worker-start as 0.5.3",
|
"utopia-php/queue": "dev-feat-get-worker-start as 0.5.3",
|
||||||
"utopia-php/registry": "0.5.*",
|
"utopia-php/registry": "0.5.*",
|
||||||
"utopia-php/storage": "0.14.*",
|
"utopia-php/storage": "0.17.*",
|
||||||
"utopia-php/swoole": "0.5.*",
|
"utopia-php/swoole": "0.5.*",
|
||||||
"utopia-php/vcs": "0.5.*",
|
"utopia-php/vcs": "0.5.*",
|
||||||
"utopia-php/websocket": "0.1.*",
|
"utopia-php/websocket": "0.1.*",
|
||||||
|
|
2
composer.lock
generated
2
composer.lock
generated
|
@ -5314,5 +5314,5 @@
|
||||||
"platform-overrides": {
|
"platform-overrides": {
|
||||||
"php": "8.0"
|
"php": "8.0"
|
||||||
},
|
},
|
||||||
"plugin-api-version": "2.2.0"
|
"plugin-api-version": "2.3.0"
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,6 +372,8 @@ services:
|
||||||
- mariadb
|
- mariadb
|
||||||
environment:
|
environment:
|
||||||
- _APP_ENV
|
- _APP_ENV
|
||||||
|
- _APP_WORKERS_NUM=1
|
||||||
|
- _APP_QUEUE_NAME=database_db_main
|
||||||
- _APP_WORKER_PER_CORE
|
- _APP_WORKER_PER_CORE
|
||||||
- _APP_OPENSSL_KEY_V1
|
- _APP_OPENSSL_KEY_V1
|
||||||
- _APP_REDIS_HOST
|
- _APP_REDIS_HOST
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
namespace Appwrite\Event;
|
namespace Appwrite\Event;
|
||||||
|
|
||||||
|
use Utopia\App;
|
||||||
use Utopia\Database\Document;
|
use Utopia\Database\Document;
|
||||||
use Utopia\Queue\Client;
|
use Utopia\Queue\Client;
|
||||||
use Utopia\Queue\Connection;
|
use Utopia\Queue\Connection;
|
||||||
|
@ -17,9 +18,7 @@ class Database extends Event
|
||||||
{
|
{
|
||||||
parent::__construct($connection);
|
parent::__construct($connection);
|
||||||
|
|
||||||
$this
|
$this->setClass(Event::DATABASE_CLASS_NAME);
|
||||||
->setQueue(Event::DATABASE_QUEUE_NAME)
|
|
||||||
->setClass(Event::DATABASE_CLASS_NAME);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -109,6 +108,8 @@ class Database extends Event
|
||||||
*/
|
*/
|
||||||
public function trigger(): string|bool
|
public function trigger(): string|bool
|
||||||
{
|
{
|
||||||
|
$this->setQueue($this->getProject()->getAttribute('database'));
|
||||||
|
|
||||||
$client = new Client($this->queue, $this->connection);
|
$client = new Client($this->queue, $this->connection);
|
||||||
|
|
||||||
return $client->enqueue([
|
return $client->enqueue([
|
||||||
|
|
Loading…
Reference in a new issue