refactor(workers): refactor the workers
This commit is contained in:
parent
1edb40130d
commit
424e153b80
|
@ -12,8 +12,6 @@ Console::success(APP_NAME.' audits worker v1 has started');
|
|||
|
||||
class AuditsV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@ Console::success(APP_NAME.' certificates worker v1 has started');
|
|||
|
||||
class CertificatesV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -21,9 +21,7 @@ Console::success(APP_NAME.' deletes worker v1 has started'."\n");
|
|||
|
||||
class DeletesV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
protected $consoleDB = null;
|
||||
protected Database $consoleDB = null;
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
|
|
|
@ -68,7 +68,7 @@ $stderr = '';
|
|||
$executionStart = \microtime(true);
|
||||
|
||||
$response = $orchestration->list(['label' => 'appwrite-type=function']);
|
||||
|
||||
/** @var Container[] $list */
|
||||
$list = [];
|
||||
|
||||
foreach ($response as $value) {
|
||||
|
@ -96,9 +96,9 @@ Console::info(count($list).' functions listed in ' . ($executionEnd - $execution
|
|||
|
||||
class FunctionsV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
public array $args = [];
|
||||
|
||||
public $allowed = [];
|
||||
public array $allowed = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
|
@ -133,7 +133,8 @@ class FunctionsV1 extends Worker
|
|||
$limit = 30;
|
||||
$sum = 30;
|
||||
$offset = 0;
|
||||
$functions = []; /** @var Document[] $functions */
|
||||
$functions = [];
|
||||
/** @var Document[] $functions */
|
||||
|
||||
while ($sum >= $limit) {
|
||||
|
||||
|
@ -169,7 +170,19 @@ class FunctionsV1 extends Worker
|
|||
|
||||
Console::success('Triggered function: ' . $event);
|
||||
|
||||
$this->execute('event', $projectId, '', $database, $function, $event, $eventData, $data, $webhooks, $userId, $jwt);
|
||||
$this->execute(
|
||||
trigger: 'event',
|
||||
projectId: $projectId,
|
||||
executionId: '',
|
||||
database: $database,
|
||||
function: $function,
|
||||
event: $event,
|
||||
eventData: $eventData,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -206,8 +219,7 @@ class FunctionsV1 extends Worker
|
|||
|
||||
$function
|
||||
->setAttribute('scheduleNext', $next)
|
||||
->setAttribute('schedulePrevious', \time())
|
||||
;
|
||||
->setAttribute('schedulePrevious', \time());
|
||||
|
||||
Authorization::disable();
|
||||
|
||||
|
@ -215,6 +227,10 @@ class FunctionsV1 extends Worker
|
|||
'scheduleNext' => $next,
|
||||
]));
|
||||
|
||||
if ($function === false) {
|
||||
throw new Exception('Function update failed (' . $functionId . ')');
|
||||
}
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
|
@ -226,7 +242,17 @@ class FunctionsV1 extends Worker
|
|||
'scheduleOriginal' => $function->getAttribute('schedule', ''),
|
||||
]); // Async task rescheduale
|
||||
|
||||
$this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt);
|
||||
$this->execute(
|
||||
trigger: $trigger,
|
||||
projectId: $projectId,
|
||||
executionId: $executionId,
|
||||
database: $database,
|
||||
function: $function,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
break;
|
||||
|
||||
case 'http':
|
||||
|
@ -238,11 +264,17 @@ class FunctionsV1 extends Worker
|
|||
throw new Exception('Function not found (' . $functionId . ')');
|
||||
}
|
||||
|
||||
$this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt);
|
||||
break;
|
||||
|
||||
default:
|
||||
# code...
|
||||
$this->execute(
|
||||
trigger: $trigger,
|
||||
projectId: $projectId,
|
||||
executionId: $executionId,
|
||||
database: $database,
|
||||
function: $function,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -254,7 +286,7 @@ class FunctionsV1 extends Worker
|
|||
* @param string $projectId
|
||||
* @param string $executionId
|
||||
* @param Database $database
|
||||
* @param Database $function
|
||||
* @param Document $function
|
||||
* @param string $event
|
||||
* @param string $eventData
|
||||
* @param string $data
|
||||
|
@ -297,7 +329,7 @@ class FunctionsV1 extends Worker
|
|||
'time' => 0,
|
||||
]);
|
||||
|
||||
if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) {
|
||||
if ($execution->isEmpty()) {
|
||||
throw new Exception('Failed to create or read execution');
|
||||
}
|
||||
|
||||
|
@ -325,11 +357,11 @@ class FunctionsV1 extends Worker
|
|||
'APPWRITE_FUNCTION_JWT' => $jwt,
|
||||
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
|
||||
]);
|
||||
|
||||
$tagId = $tag->getId() ?? '';
|
||||
$tagPath = $tag->getAttribute('path', '');
|
||||
$tagPathTarget = '/tmp/project-'.$projectId.'/'.$tag->getId().'/code.tar.gz';
|
||||
$tagPathTarget = '/tmp/project-' . $projectId . '/' . $tagId . '/code.tar.gz';
|
||||
$tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME);
|
||||
$container = 'appwrite-function-'.$tag->getId();
|
||||
$container = 'appwrite-function-' . $tagId;
|
||||
$command = \escapeshellcmd($tag->getAttribute('command', ''));
|
||||
|
||||
if (!\is_readable($tagPath)) {
|
||||
|
@ -388,7 +420,8 @@ class FunctionsV1 extends Worker
|
|||
$id = $orchestration->run(
|
||||
image: $runtime['image'],
|
||||
name: $container,
|
||||
command: ['tail',
|
||||
command: [
|
||||
'tail',
|
||||
'-f',
|
||||
'/dev/null'
|
||||
],
|
||||
|
@ -400,7 +433,8 @@ class FunctionsV1 extends Worker
|
|||
labels: [
|
||||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($executionTime)
|
||||
]);
|
||||
]
|
||||
);
|
||||
|
||||
$untarStdout = '';
|
||||
$untarStderr = '';
|
||||
|
@ -415,7 +449,8 @@ class FunctionsV1 extends Worker
|
|||
stdout: $untarStdout,
|
||||
stderr: $untarStderr,
|
||||
vars: $vars,
|
||||
timeout: 60);
|
||||
timeout: 60
|
||||
);
|
||||
|
||||
if (!$untarSuccess) {
|
||||
throw new Exception('Failed to extract tar: ' . $untarStderr);
|
||||
|
@ -423,15 +458,18 @@ class FunctionsV1 extends Worker
|
|||
|
||||
$executionEnd = \microtime(true);
|
||||
|
||||
$list[$container] = new Container($container, $id, 'Up',
|
||||
$list[$container] = new Container(
|
||||
$container,
|
||||
$id,
|
||||
'Up',
|
||||
[
|
||||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($executionTime),
|
||||
]);
|
||||
]
|
||||
);
|
||||
|
||||
Console::info('Function created in ' . ($executionEnd - $executionStart) . ' seconds');
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
Console::info('Container is ready to run');
|
||||
}
|
||||
|
||||
|
@ -449,7 +487,8 @@ class FunctionsV1 extends Worker
|
|||
stdout: $stdout,
|
||||
stderr: $stderr,
|
||||
vars: $vars,
|
||||
timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)));
|
||||
timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900))
|
||||
);
|
||||
} catch (TimeoutException $e) {
|
||||
$exitCode = 124;
|
||||
} catch (OrchestrationException $e) {
|
||||
|
@ -476,7 +515,7 @@ class FunctionsV1 extends Worker
|
|||
|
||||
Authorization::reset();
|
||||
|
||||
if (false === $function) {
|
||||
if ($execution === false) {
|
||||
throw new Exception('Failed saving execution to DB', 500);
|
||||
}
|
||||
|
||||
|
@ -501,8 +540,7 @@ class FunctionsV1 extends Worker
|
|||
->setParam('functionStatus', $functionStatus)
|
||||
->setParam('functionExecutionTime', $executionTime * 1000) // ms
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0)
|
||||
;
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
|
@ -518,7 +556,9 @@ class FunctionsV1 extends Worker
|
|||
*/
|
||||
public function cleanup(): void
|
||||
{
|
||||
/** @var Container[] $list */
|
||||
global $list;
|
||||
/** @var Orchestration $orchestration */
|
||||
global $orchestration;
|
||||
|
||||
Console::success(count($list) . ' running containers counted');
|
||||
|
@ -528,7 +568,7 @@ class FunctionsV1 extends Worker
|
|||
if (\count($list) > $max) {
|
||||
Console::info('Starting containers cleanup');
|
||||
|
||||
\uasort($list, function ($item1, $item2) {
|
||||
\uasort($list, function (Container $item1, Container $item2) {
|
||||
return (int)($item1->getLabels['appwrite-created'] ?? 0) <=> (int)($item2->getLabels['appwrite-created'] ?? 0);
|
||||
});
|
||||
|
||||
|
|
|
@ -13,11 +13,6 @@ Console::success(APP_NAME . ' mails worker v1 has started' . "\n");
|
|||
|
||||
class MailsV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -17,11 +17,6 @@ Console::success(APP_NAME.' tasks worker v1 has started');
|
|||
|
||||
class TasksV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -11,11 +11,6 @@ Console::success(APP_NAME.' usage worker v1 has started');
|
|||
|
||||
class UsageV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@ Console::success(APP_NAME.' webhooks worker v1 has started');
|
|||
|
||||
class WebhooksV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -37,7 +35,7 @@ class WebhooksV1 extends Worker
|
|||
$name = $webhook['name'] ?? '';
|
||||
$signature = $webhook['signature'] ?? 'not-yet-implemented';
|
||||
$url = $webhook['url'] ?? '';
|
||||
$security = (bool) $webhook['security'] ?? true;
|
||||
$security = (bool) ($webhook['security'] ?? true);
|
||||
$httpUser = $webhook['httpUser'] ?? null;
|
||||
$httpPass = $webhook['httpPass'] ?? null;
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ namespace Appwrite\Resque;
|
|||
|
||||
abstract class Worker
|
||||
{
|
||||
public $args = [];
|
||||
public array $args = [];
|
||||
|
||||
abstract public function init(): void;
|
||||
|
||||
|
@ -17,7 +17,7 @@ abstract class Worker
|
|||
$this->init();
|
||||
}
|
||||
|
||||
public function perform()
|
||||
public function perform(): void
|
||||
{
|
||||
$this->run();
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue