Simplify worker
This commit is contained in:
parent
44875e2d18
commit
9e4a65605c
|
@ -7,12 +7,17 @@ use Swoole\Runtime;
|
|||
use Utopia\App;
|
||||
use Utopia\Cache\Adapter\Sharding;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Queue\Adapter\Swoole;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Queue\Server;
|
||||
use Utopia\Registry\Registry;
|
||||
use Utopia\Logger\Log;
|
||||
|
||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
|
||||
global $register;
|
||||
|
||||
|
@ -91,4 +96,53 @@ $pools = $register->get('pools');
|
|||
$connection = $pools->get('queue')->pop()->getResource();
|
||||
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
|
||||
|
||||
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
|
||||
if(empty(App::getEnv('QUEUE'))) {
|
||||
throw new Exception('Please configure "QUEUE" environemnt variable.');
|
||||
}
|
||||
|
||||
$adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE'));
|
||||
$server = new Server($adapter);
|
||||
|
||||
$server
|
||||
->error()
|
||||
->inject('error')
|
||||
->inject('logger')
|
||||
->inject('register')
|
||||
->action(function ($error, $logger, $register) {
|
||||
|
||||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
if ($error instanceof PDOException) {
|
||||
throw $error;
|
||||
}
|
||||
|
||||
if ($error->getCode() >= 500 || $error->getCode() === 0) {
|
||||
$log = new Log();
|
||||
|
||||
$log->setNamespace("appwrite-worker");
|
||||
$log->setServer(\gethostname());
|
||||
$log->setVersion($version);
|
||||
$log->setType(Log::TYPE_ERROR);
|
||||
$log->setMessage($error->getMessage());
|
||||
$log->setAction('appwrite-worker-functions');
|
||||
$log->addTag('verboseType', get_class($error));
|
||||
$log->addTag('code', $error->getCode());
|
||||
$log->addExtra('file', $error->getFile());
|
||||
$log->addExtra('line', $error->getLine());
|
||||
$log->addExtra('trace', $error->getTraceAsString());
|
||||
$log->addExtra('detailedTrace', $error->getTrace());
|
||||
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
|
||||
|
||||
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
|
||||
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
|
||||
|
||||
$logger->addLog($log);
|
||||
}
|
||||
|
||||
Console::error('[Error] Type: ' . get_class($error));
|
||||
Console::error('[Error] Message: ' . $error->getMessage());
|
||||
Console::error('[Error] File: ' . $error->getFile());
|
||||
Console::error('[Error] Line: ' . $error->getLine());
|
||||
|
||||
$register->get('pools')->reclaim();
|
||||
});
|
|
@ -20,19 +20,11 @@ use Utopia\Database\Permission;
|
|||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Role;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Logger\Log;
|
||||
use Utopia\Queue\Adapter\Swoole;
|
||||
use Utopia\Queue\Server;
|
||||
|
||||
Authorization::disable();
|
||||
Authorization::setDefaultStatus(false);
|
||||
|
||||
global $connection;
|
||||
global $workerNumber;
|
||||
|
||||
$adapter = new Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
|
||||
$server = new Server($adapter);
|
||||
|
||||
Server::setResource('execute', function () {
|
||||
return function (
|
||||
Document $project,
|
||||
|
@ -380,49 +372,5 @@ $server->job()
|
|||
}
|
||||
});
|
||||
|
||||
$server
|
||||
->error()
|
||||
->inject('error')
|
||||
->inject('logger')
|
||||
->inject('register')
|
||||
->action(function ($error, $logger, $register) {
|
||||
|
||||
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
if ($error instanceof PDOException) {
|
||||
throw $error;
|
||||
}
|
||||
|
||||
if ($error->getCode() >= 500 || $error->getCode() === 0) {
|
||||
$log = new Log();
|
||||
|
||||
$log->setNamespace("appwrite-worker");
|
||||
$log->setServer(\gethostname());
|
||||
$log->setVersion($version);
|
||||
$log->setType(Log::TYPE_ERROR);
|
||||
$log->setMessage($error->getMessage());
|
||||
$log->setAction('appwrite-worker-functions');
|
||||
$log->addTag('verboseType', get_class($error));
|
||||
$log->addTag('code', $error->getCode());
|
||||
$log->addExtra('file', $error->getFile());
|
||||
$log->addExtra('line', $error->getLine());
|
||||
$log->addExtra('trace', $error->getTraceAsString());
|
||||
$log->addExtra('detailedTrace', $error->getTrace());
|
||||
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
|
||||
|
||||
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
|
||||
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
|
||||
|
||||
$logger->addLog($log);
|
||||
}
|
||||
|
||||
Console::error('[Error] Type: ' . get_class($error));
|
||||
Console::error('[Error] Message: ' . $error->getMessage());
|
||||
Console::error('[Error] File: ' . $error->getFile());
|
||||
Console::error('[Error] Line: ' . $error->getLine());
|
||||
|
||||
$register->get('pools')->reclaim();
|
||||
});
|
||||
|
||||
$server->workerStart();
|
||||
$server->start();
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
php /usr/src/code/app/workers/functions.php $@
|
||||
QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@
|
|
@ -60,7 +60,7 @@
|
|||
"utopia-php/platform": "0.3.*",
|
||||
"utopia-php/pools": "0.4.*",
|
||||
"utopia-php/preloader": "0.2.*",
|
||||
"utopia-php/registry": "dev-feat-allow-params as 0.5.0",
|
||||
"utopia-php/registry": "0.5.*",
|
||||
"utopia-php/storage": "0.11.*",
|
||||
"utopia-php/swoole": "0.5.*",
|
||||
"utopia-php/websocket": "0.1.0",
|
||||
|
|
Loading…
Reference in a new issue