Merge pull request #1559 from TorstenDittmann/clean-up-functions-worker
refactor(workers): refactor the workers
This commit is contained in:
commit
8e7402951c
|
@ -5,15 +5,13 @@ use Utopia\Audit\Audit;
|
|||
use Utopia\Audit\Adapters\MySQL as AuditAdapter;
|
||||
use Utopia\CLI\Console;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Audits V1 Worker');
|
||||
Console::success(APP_NAME.' audits worker v1 has started');
|
||||
Console::success(APP_NAME . ' audits worker v1 has started');
|
||||
|
||||
class AuditsV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -30,9 +28,9 @@ class AuditsV1 extends Worker
|
|||
$ip = $this->args['ip'];
|
||||
$data = $this->args['data'];
|
||||
$db = $register->get('db', true);
|
||||
|
||||
|
||||
$adapter = new AuditAdapter($db);
|
||||
$adapter->setNamespace('app_'.$projectId);
|
||||
$adapter->setNamespace('app_' . $projectId);
|
||||
|
||||
$audit = new Audit($adapter);
|
||||
|
||||
|
@ -43,4 +41,4 @@ class AuditsV1 extends Worker
|
|||
{
|
||||
// ... Remove environment for this job
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,15 +11,13 @@ use Utopia\CLI\Console;
|
|||
use Utopia\Config\Config;
|
||||
use Utopia\Domains\Domain;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Certificates V1 Worker');
|
||||
Console::success(APP_NAME.' certificates worker v1 has started');
|
||||
Console::success(APP_NAME . ' certificates worker v1 has started');
|
||||
|
||||
class CertificatesV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -57,33 +55,33 @@ class CertificatesV1 extends Worker
|
|||
// Validation Args
|
||||
$validateTarget = $this->args['validateTarget'] ?? true;
|
||||
$validateCNAME = $this->args['validateCNAME'] ?? true;
|
||||
|
||||
|
||||
// Options
|
||||
$domain = new Domain((!empty($domain)) ? $domain : '');
|
||||
$expiry = 60 * 60 * 24 * 30 * 2; // 60 days
|
||||
$safety = 60 * 60; // 1 hour
|
||||
$renew = (\time() + $expiry);
|
||||
|
||||
if(empty($domain->get())) {
|
||||
if (empty($domain->get())) {
|
||||
throw new Exception('Missing domain');
|
||||
}
|
||||
|
||||
if(!$domain->isKnown() || $domain->isTest()) {
|
||||
if (!$domain->isKnown() || $domain->isTest()) {
|
||||
throw new Exception('Unknown public suffix for domain');
|
||||
}
|
||||
|
||||
if($validateTarget) {
|
||||
if ($validateTarget) {
|
||||
$target = new Domain(App::getEnv('_APP_DOMAIN_TARGET', ''));
|
||||
|
||||
if(!$target->isKnown() || $target->isTest()) {
|
||||
throw new Exception('Unreachable CNAME target ('.$target->get().'), please use a domain with a public suffix.');
|
||||
|
||||
if (!$target->isKnown() || $target->isTest()) {
|
||||
throw new Exception('Unreachable CNAME target (' . $target->get() . '), please use a domain with a public suffix.');
|
||||
}
|
||||
}
|
||||
|
||||
if($validateCNAME) {
|
||||
if ($validateCNAME) {
|
||||
$validator = new CNAME($target->get()); // Verify Domain with DNS records
|
||||
|
||||
if(!$validator->isValid($domain->get())) {
|
||||
|
||||
if (!$validator->isValid($domain->get())) {
|
||||
throw new Exception('Failed to verify domain DNS records');
|
||||
}
|
||||
}
|
||||
|
@ -92,8 +90,8 @@ class CertificatesV1 extends Worker
|
|||
'limit' => 1,
|
||||
'offset' => 0,
|
||||
'filters' => [
|
||||
'$collection='.Database::SYSTEM_COLLECTION_CERTIFICATES,
|
||||
'domain='.$domain->get(),
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_CERTIFICATES,
|
||||
'domain=' . $domain->get(),
|
||||
],
|
||||
]);
|
||||
|
||||
|
@ -106,16 +104,18 @@ class CertificatesV1 extends Worker
|
|||
|
||||
$certificate = (!empty($certificate) && $certificate instanceof $certificate) ? $certificate->getArrayCopy() : [];
|
||||
|
||||
if(!empty($certificate)
|
||||
if (
|
||||
!empty($certificate)
|
||||
&& isset($certificate['issueDate'])
|
||||
&& (($certificate['issueDate'] + ($expiry)) > \time())) { // Check last issue time
|
||||
throw new Exception('Renew isn\'t required');
|
||||
&& (($certificate['issueDate'] + ($expiry)) > \time())
|
||||
) { // Check last issue time
|
||||
throw new Exception('Renew isn\'t required');
|
||||
}
|
||||
|
||||
$staging = (App::isProduction()) ? '' : ' --dry-run';
|
||||
$email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS');
|
||||
|
||||
if(empty($email)) {
|
||||
if (empty($email)) {
|
||||
throw new Exception('You must set a valid security email address (_APP_SYSTEM_SECURITY_EMAIL_ADDRESS) to issue an SSL certificate');
|
||||
}
|
||||
|
||||
|
@ -123,36 +123,36 @@ class CertificatesV1 extends Worker
|
|||
$stderr = '';
|
||||
|
||||
$exit = Console::execute("certbot certonly --webroot --noninteractive --agree-tos{$staging}"
|
||||
." --email ".$email
|
||||
." -w ".APP_STORAGE_CERTIFICATES
|
||||
." -d {$domain->get()}", '', $stdout, $stderr);
|
||||
. " --email " . $email
|
||||
. " -w " . APP_STORAGE_CERTIFICATES
|
||||
. " -d {$domain->get()}", '', $stdout, $stderr);
|
||||
|
||||
if($exit !== 0) {
|
||||
throw new Exception('Failed to issue a certificate with message: '.$stderr);
|
||||
if ($exit !== 0) {
|
||||
throw new Exception('Failed to issue a certificate with message: ' . $stderr);
|
||||
}
|
||||
|
||||
$path = APP_STORAGE_CERTIFICATES.'/'.$domain->get();
|
||||
$path = APP_STORAGE_CERTIFICATES . '/' . $domain->get();
|
||||
|
||||
if(!\is_readable($path)) {
|
||||
if (!\is_readable($path)) {
|
||||
if (!\mkdir($path, 0755, true)) {
|
||||
throw new Exception('Failed to create path...');
|
||||
}
|
||||
}
|
||||
|
||||
if(!@\rename('/etc/letsencrypt/live/'.$domain->get().'/cert.pem', APP_STORAGE_CERTIFICATES.'/'.$domain->get().'/cert.pem')) {
|
||||
throw new Exception('Failed to rename certificate cert.pem: '.\json_encode($stdout));
|
||||
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $domain->get() . '/cert.pem', APP_STORAGE_CERTIFICATES . '/' . $domain->get() . '/cert.pem')) {
|
||||
throw new Exception('Failed to rename certificate cert.pem: ' . \json_encode($stdout));
|
||||
}
|
||||
|
||||
if(!@\rename('/etc/letsencrypt/live/'.$domain->get().'/chain.pem', APP_STORAGE_CERTIFICATES.'/'.$domain->get().'/chain.pem')) {
|
||||
throw new Exception('Failed to rename certificate chain.pem: '.\json_encode($stdout));
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $domain->get() . '/chain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain->get() . '/chain.pem')) {
|
||||
throw new Exception('Failed to rename certificate chain.pem: ' . \json_encode($stdout));
|
||||
}
|
||||
|
||||
if(!@\rename('/etc/letsencrypt/live/'.$domain->get().'/fullchain.pem', APP_STORAGE_CERTIFICATES.'/'.$domain->get().'/fullchain.pem')) {
|
||||
throw new Exception('Failed to rename certificate fullchain.pem: '.\json_encode($stdout));
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $domain->get() . '/fullchain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain->get() . '/fullchain.pem')) {
|
||||
throw new Exception('Failed to rename certificate fullchain.pem: ' . \json_encode($stdout));
|
||||
}
|
||||
|
||||
if(!@\rename('/etc/letsencrypt/live/'.$domain->get().'/privkey.pem', APP_STORAGE_CERTIFICATES.'/'.$domain->get().'/privkey.pem')) {
|
||||
throw new Exception('Failed to rename certificate privkey.pem: '.\json_encode($stdout));
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $domain->get() . '/privkey.pem', APP_STORAGE_CERTIFICATES . '/' . $domain->get() . '/privkey.pem')) {
|
||||
throw new Exception('Failed to rename certificate privkey.pem: ' . \json_encode($stdout));
|
||||
}
|
||||
|
||||
$certificate = \array_merge($certificate, [
|
||||
|
@ -170,30 +170,30 @@ class CertificatesV1 extends Worker
|
|||
|
||||
$certificate = $consoleDB->createDocument($certificate);
|
||||
|
||||
if(!$certificate) {
|
||||
if (!$certificate) {
|
||||
throw new Exception('Failed saving certificate to DB');
|
||||
}
|
||||
|
||||
if(!empty($document)) {
|
||||
if (!empty($document)) {
|
||||
$document = \array_merge($document, [
|
||||
'updated' => \time(),
|
||||
'certificateId' => $certificate->getId(),
|
||||
]);
|
||||
|
||||
|
||||
$document = $consoleDB->updateDocument($document);
|
||||
|
||||
if(!$document) {
|
||||
|
||||
if (!$document) {
|
||||
throw new Exception('Failed saving domain to DB');
|
||||
}
|
||||
}
|
||||
|
||||
$config =
|
||||
"tls:
|
||||
|
||||
$config =
|
||||
"tls:
|
||||
certificates:
|
||||
- certFile: /storage/certificates/{$domain->get()}/fullchain.pem
|
||||
keyFile: /storage/certificates/{$domain->get()}/privkey.pem";
|
||||
|
||||
if(!\file_put_contents(APP_STORAGE_CONFIG.'/'.$domain->get().'.yml', $config)) {
|
||||
if (!\file_put_contents(APP_STORAGE_CONFIG . '/' . $domain->get() . '.yml', $config)) {
|
||||
throw new Exception('Failed to save SSL configuration');
|
||||
}
|
||||
|
||||
|
@ -210,4 +210,4 @@ class CertificatesV1 extends Worker
|
|||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,16 +14,14 @@ use Utopia\Config\Config;
|
|||
use Utopia\Audit\Audit;
|
||||
use Utopia\Audit\Adapters\MySQL as AuditAdapter;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Deletes V1 Worker');
|
||||
Console::success(APP_NAME.' deletes worker v1 has started'."\n");
|
||||
Console::success(APP_NAME . ' deletes worker v1 has started' . "\n");
|
||||
|
||||
class DeletesV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
protected $consoleDB = null;
|
||||
protected Database $consoleDB;
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
|
@ -31,9 +29,9 @@ class DeletesV1 extends Worker
|
|||
|
||||
public function run(): void
|
||||
{
|
||||
$projectId = isset($this->args['projectId']) ? $this->args['projectId'] : '';
|
||||
$projectId = isset($this->args['projectId']) ? $this->args['projectId'] : '';
|
||||
$type = $this->args['type'];
|
||||
|
||||
|
||||
switch (strval($type)) {
|
||||
case DELETE_TYPE_DOCUMENT:
|
||||
$document = $this->args['document'];
|
||||
|
@ -55,7 +53,7 @@ class DeletesV1 extends Worker
|
|||
$this->deleteMemberships($document, $projectId);
|
||||
break;
|
||||
default:
|
||||
Console::error('No lazy delete operation available for document of type: '.$document->getCollection());
|
||||
Console::error('No lazy delete operation available for document of type: ' . $document->getCollection());
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
@ -76,33 +74,33 @@ class DeletesV1 extends Worker
|
|||
$document = new Document($this->args['document']);
|
||||
$this->deleteCertificates($document);
|
||||
break;
|
||||
|
||||
|
||||
default:
|
||||
Console::error('No delete operation for type: '.$type);
|
||||
Console::error('No delete operation for type: ' . $type);
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
|
||||
protected function deleteDocuments(Document $document, $projectId)
|
||||
|
||||
protected function deleteDocuments(Document $document, $projectId)
|
||||
{
|
||||
$collectionId = $document->getId();
|
||||
|
||||
|
||||
// Delete Documents in the deleted collection
|
||||
$this->deleteByGroup([
|
||||
'$collection='.$collectionId
|
||||
], $this->getProjectDB($projectId));
|
||||
'$collection=' . $collectionId
|
||||
], $this->getProjectDB($projectId));
|
||||
}
|
||||
|
||||
protected function deleteMemberships(Document $document, $projectId) {
|
||||
protected function deleteMemberships(Document $document, $projectId)
|
||||
{
|
||||
// Delete Memberships
|
||||
$this->deleteByGroup([
|
||||
'$collection='.Database::SYSTEM_COLLECTION_MEMBERSHIPS,
|
||||
'teamId='.$document->getId(),
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_MEMBERSHIPS,
|
||||
'teamId=' . $document->getId(),
|
||||
], $this->getProjectDB($projectId));
|
||||
}
|
||||
|
||||
|
@ -110,8 +108,8 @@ class DeletesV1 extends Worker
|
|||
{
|
||||
// Delete all DBs
|
||||
$this->getConsoleDB()->deleteNamespace($document->getId());
|
||||
$uploads = new Local(APP_STORAGE_UPLOADS.'/app-'.$document->getId());
|
||||
$cache = new Local(APP_STORAGE_CACHE.'/app-'.$document->getId());
|
||||
$uploads = new Local(APP_STORAGE_UPLOADS . '/app-' . $document->getId());
|
||||
$cache = new Local(APP_STORAGE_CACHE . '/app-' . $document->getId());
|
||||
|
||||
// Delete all storage directories
|
||||
$uploads->delete($uploads->getRoot(), true);
|
||||
|
@ -121,7 +119,7 @@ class DeletesV1 extends Worker
|
|||
protected function deleteUser(Document $document, $projectId)
|
||||
{
|
||||
$tokens = $document->getAttribute('tokens', []);
|
||||
|
||||
|
||||
foreach ($tokens as $token) {
|
||||
if (!$this->getProjectDB($projectId)->deleteDocument($token->getId())) {
|
||||
throw new Exception('Failed to remove token from DB');
|
||||
|
@ -138,14 +136,14 @@ class DeletesV1 extends Worker
|
|||
|
||||
// Delete Memberships and decrement team membership counts
|
||||
$this->deleteByGroup([
|
||||
'$collection='.Database::SYSTEM_COLLECTION_MEMBERSHIPS,
|
||||
'userId='.$document->getId(),
|
||||
], $this->getProjectDB($projectId), function(Document $document) use ($projectId) {
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_MEMBERSHIPS,
|
||||
'userId=' . $document->getId(),
|
||||
], $this->getProjectDB($projectId), function (Document $document) use ($projectId) {
|
||||
|
||||
if ($document->getAttribute('confirm')) { // Count only confirmed members
|
||||
$teamId = $document->getAttribute('teamId');
|
||||
$team = $this->getProjectDB($projectId)->getDocument($teamId);
|
||||
if(!$team->isEmpty()) {
|
||||
if (!$team->isEmpty()) {
|
||||
$team = $this->getProjectDB($projectId)->updateDocument(\array_merge($team->getArrayCopy(), [
|
||||
'sum' => \max($team->getAttribute('sum', 0) - 1, 0), // Ensure that sum >= 0
|
||||
]));
|
||||
|
@ -154,37 +152,37 @@ class DeletesV1 extends Worker
|
|||
});
|
||||
}
|
||||
|
||||
protected function deleteExecutionLogs($timestamp)
|
||||
protected function deleteExecutionLogs($timestamp)
|
||||
{
|
||||
$this->deleteForProjectIds(function($projectId) use ($timestamp) {
|
||||
$this->deleteForProjectIds(function ($projectId) use ($timestamp) {
|
||||
if (!($projectDB = $this->getProjectDB($projectId))) {
|
||||
throw new Exception('Failed to get projectDB for project '.$projectId);
|
||||
throw new Exception('Failed to get projectDB for project ' . $projectId);
|
||||
}
|
||||
|
||||
// Delete Executions
|
||||
$this->deleteByGroup([
|
||||
'$collection='.Database::SYSTEM_COLLECTION_EXECUTIONS,
|
||||
'dateCreated<'.$timestamp
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_EXECUTIONS,
|
||||
'dateCreated<' . $timestamp
|
||||
], $projectDB);
|
||||
});
|
||||
}
|
||||
|
||||
protected function deleteAbuseLogs($timestamp)
|
||||
protected function deleteAbuseLogs($timestamp)
|
||||
{
|
||||
global $register;
|
||||
if($timestamp == 0) {
|
||||
if ($timestamp == 0) {
|
||||
throw new Exception('Failed to delete audit logs. No timestamp provided');
|
||||
}
|
||||
|
||||
$timeLimit = new TimeLimit("", 0, 1, $register->get('db'));
|
||||
|
||||
$this->deleteForProjectIds(function($projectId) use ($timeLimit, $timestamp){
|
||||
$timeLimit->setNamespace('app_'.$projectId);
|
||||
$abuse = new Abuse($timeLimit);
|
||||
$this->deleteForProjectIds(function ($projectId) use ($timeLimit, $timestamp) {
|
||||
$timeLimit->setNamespace('app_' . $projectId);
|
||||
$abuse = new Abuse($timeLimit);
|
||||
|
||||
$status = $abuse->cleanup($timestamp);
|
||||
if (!$status) {
|
||||
throw new Exception('Failed to delete Abuse logs for project '.$projectId);
|
||||
throw new Exception('Failed to delete Abuse logs for project ' . $projectId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -192,16 +190,16 @@ class DeletesV1 extends Worker
|
|||
protected function deleteAuditLogs($timestamp)
|
||||
{
|
||||
global $register;
|
||||
if($timestamp == 0) {
|
||||
if ($timestamp == 0) {
|
||||
throw new Exception('Failed to delete audit logs. No timestamp provided');
|
||||
}
|
||||
$this->deleteForProjectIds(function($projectId) use ($register, $timestamp){
|
||||
$this->deleteForProjectIds(function ($projectId) use ($register, $timestamp) {
|
||||
$adapter = new AuditAdapter($register->get('db'));
|
||||
$adapter->setNamespace('app_'.$projectId);
|
||||
$adapter->setNamespace('app_' . $projectId);
|
||||
$audit = new Audit($adapter);
|
||||
$status = $audit->cleanup($timestamp);
|
||||
if (!$status) {
|
||||
throw new Exception('Failed to delete Audit logs for project'.$projectId);
|
||||
throw new Exception('Failed to delete Audit logs for project' . $projectId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -209,26 +207,25 @@ class DeletesV1 extends Worker
|
|||
protected function deleteFunction(Document $document, $projectId)
|
||||
{
|
||||
$projectDB = $this->getProjectDB($projectId);
|
||||
$device = new Local(APP_STORAGE_FUNCTIONS.'/app-'.$projectId);
|
||||
$device = new Local(APP_STORAGE_FUNCTIONS . '/app-' . $projectId);
|
||||
|
||||
// Delete Tags
|
||||
$this->deleteByGroup([
|
||||
'$collection='.Database::SYSTEM_COLLECTION_TAGS,
|
||||
'functionId='.$document->getId(),
|
||||
], $projectDB, function(Document $document) use ($device) {
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_TAGS,
|
||||
'functionId=' . $document->getId(),
|
||||
], $projectDB, function (Document $document) use ($device) {
|
||||
|
||||
if ($device->delete($document->getAttribute('path', ''))) {
|
||||
Console::success('Delete code tag: '.$document->getAttribute('path', ''));
|
||||
}
|
||||
else {
|
||||
Console::error('Failed to delete code tag: '.$document->getAttribute('path', ''));
|
||||
Console::success('Delete code tag: ' . $document->getAttribute('path', ''));
|
||||
} else {
|
||||
Console::error('Failed to delete code tag: ' . $document->getAttribute('path', ''));
|
||||
}
|
||||
});
|
||||
|
||||
// Delete Executions
|
||||
$this->deleteByGroup([
|
||||
'$collection='.Database::SYSTEM_COLLECTION_EXECUTIONS,
|
||||
'functionId='.$document->getId(),
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_EXECUTIONS,
|
||||
'functionId=' . $document->getId(),
|
||||
], $projectDB);
|
||||
}
|
||||
|
||||
|
@ -236,17 +233,16 @@ class DeletesV1 extends Worker
|
|||
{
|
||||
Authorization::disable();
|
||||
|
||||
if($database->deleteDocument($document->getId())) {
|
||||
Console::success('Deleted document "'.$document->getId().'" successfully');
|
||||
if ($database->deleteDocument($document->getId())) {
|
||||
Console::success('Deleted document "' . $document->getId() . '" successfully');
|
||||
|
||||
if(is_callable($callback)) {
|
||||
if (is_callable($callback)) {
|
||||
$callback($document);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
Console::error('Failed to delete document: '.$document->getId());
|
||||
} else {
|
||||
Console::error('Failed to delete document: ' . $document->getId());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -262,8 +258,8 @@ class DeletesV1 extends Worker
|
|||
$sum = $limit;
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
|
||||
while($sum === $limit) {
|
||||
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
|
||||
Authorization::disable();
|
||||
|
@ -272,18 +268,18 @@ class DeletesV1 extends Worker
|
|||
'orderType' => 'ASC',
|
||||
'orderCast' => 'string',
|
||||
'filters' => [
|
||||
'$collection='.Database::SYSTEM_COLLECTION_PROJECTS,
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_PROJECTS,
|
||||
],
|
||||
]);
|
||||
Authorization::reset();
|
||||
|
||||
$projectIds = array_map (function ($project) {
|
||||
return $project->getId();
|
||||
$projectIds = array_map(function ($project) {
|
||||
return $project->getId();
|
||||
}, $projects);
|
||||
|
||||
$sum = count($projects);
|
||||
|
||||
Console::info('Executing delete function for chunk #'.$chunk.'. Found '.$sum.' projects');
|
||||
Console::info('Executing delete function for chunk #' . $chunk . '. Found ' . $sum . ' projects');
|
||||
foreach ($projectIds as $projectId) {
|
||||
$callback($projectId);
|
||||
$count++;
|
||||
|
@ -303,8 +299,8 @@ class DeletesV1 extends Worker
|
|||
$sum = $limit;
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
|
||||
while($sum === $limit) {
|
||||
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
|
||||
Authorization::disable();
|
||||
|
@ -321,7 +317,7 @@ class DeletesV1 extends Worker
|
|||
|
||||
$sum = count($results);
|
||||
|
||||
Console::info('Deleting chunk #'.$chunk.'. Found '.$sum.' documents');
|
||||
Console::info('Deleting chunk #' . $chunk . '. Found ' . $sum . ' documents');
|
||||
|
||||
foreach ($results as $document) {
|
||||
$this->deleteById($document, $database, $callback);
|
||||
|
@ -340,8 +336,8 @@ class DeletesV1 extends Worker
|
|||
$directory = APP_STORAGE_CERTIFICATES . '/' . $domain;
|
||||
$checkTraversal = realpath($directory) === $directory;
|
||||
|
||||
if($domain && $checkTraversal && is_dir($directory)) {
|
||||
array_map('unlink', glob($directory.'/*.*'));
|
||||
if ($domain && $checkTraversal && is_dir($directory)) {
|
||||
array_map('unlink', glob($directory . '/*.*'));
|
||||
rmdir($directory);
|
||||
Console::info("Deleted certificate files for {$domain}");
|
||||
} else {
|
||||
|
@ -350,7 +346,8 @@ class DeletesV1 extends Worker
|
|||
}
|
||||
|
||||
/**
|
||||
* @return Database;
|
||||
* @return Database
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function getConsoleDB(): Database
|
||||
{
|
||||
|
@ -359,7 +356,7 @@ class DeletesV1 extends Worker
|
|||
$db = $register->get('db');
|
||||
$cache = $register->get('cache');
|
||||
|
||||
if($this->consoleDB === null) {
|
||||
if (!isset($this->consoleDB)) {
|
||||
$this->consoleDB = new Database();
|
||||
$this->consoleDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));;
|
||||
$this->consoleDB->setNamespace('app_console'); // Main DB
|
||||
|
@ -370,9 +367,11 @@ class DeletesV1 extends Worker
|
|||
}
|
||||
|
||||
/**
|
||||
* @return Database;
|
||||
* @param string $projectId
|
||||
* @return Database
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function getProjectDB($projectId): Database
|
||||
protected function getProjectDB(string $projectId): Database
|
||||
{
|
||||
global $register;
|
||||
|
||||
|
@ -381,9 +380,9 @@ class DeletesV1 extends Worker
|
|||
|
||||
$projectDB = new Database();
|
||||
$projectDB->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||
$projectDB->setNamespace('app_'.$projectId); // Main DB
|
||||
$projectDB->setNamespace('app_' . $projectId); // Main DB
|
||||
$projectDB->setMocks(Config::getParam('collections', []));
|
||||
|
||||
return $projectDB;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,12 +19,12 @@ use Utopia\Orchestration\Container;
|
|||
use Utopia\Orchestration\Exception\Orchestration as OrchestrationException;
|
||||
use Utopia\Orchestration\Exception\Timeout as TimeoutException;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Runtime::enableCoroutine(0);
|
||||
|
||||
Console::title('Functions V1 Worker');
|
||||
Console::success(APP_NAME.' functions worker v1 has started');
|
||||
Console::success(APP_NAME . ' functions worker v1 has started');
|
||||
|
||||
$runtimes = Config::getParam('runtimes');
|
||||
|
||||
|
@ -38,11 +38,11 @@ $orchestration = new Orchestration(new DockerAPI($dockerUser, $dockerPass, $dock
|
|||
*/
|
||||
$warmupStart = \microtime(true);
|
||||
|
||||
Co\run(function() use ($runtimes, $orchestration) { // Warmup: make sure images are ready to run fast 🚀
|
||||
foreach($runtimes as $runtime) {
|
||||
go(function() use ($runtime, $orchestration) {
|
||||
Console::info('Warming up '.$runtime['name'].' '.$runtime['version'].' environment...');
|
||||
|
||||
Co\run(function () use ($runtimes, $orchestration) { // Warmup: make sure images are ready to run fast 🚀
|
||||
foreach ($runtimes as $runtime) {
|
||||
go(function () use ($runtime, $orchestration) {
|
||||
Console::info('Warming up ' . $runtime['name'] . ' ' . $runtime['version'] . ' environment...');
|
||||
|
||||
$response = $orchestration->pull($runtime['image']);
|
||||
|
||||
if ($response) {
|
||||
|
@ -57,7 +57,7 @@ Co\run(function() use ($runtimes, $orchestration) { // Warmup: make sure images
|
|||
$warmupEnd = \microtime(true);
|
||||
$warmupTime = $warmupEnd - $warmupStart;
|
||||
|
||||
Console::success('Finished warmup in '.$warmupTime.' seconds');
|
||||
Console::success('Finished warmup in ' . $warmupTime . ' seconds');
|
||||
|
||||
/**
|
||||
* List function servers
|
||||
|
@ -68,7 +68,7 @@ $stderr = '';
|
|||
$executionStart = \microtime(true);
|
||||
|
||||
$response = $orchestration->list(['label' => 'appwrite-type=function']);
|
||||
|
||||
/** @var Container[] $list */
|
||||
$list = [];
|
||||
|
||||
foreach ($response as $value) {
|
||||
|
@ -77,7 +77,7 @@ foreach ($response as $value) {
|
|||
|
||||
$executionEnd = \microtime(true);
|
||||
|
||||
Console::info(count($list).' functions listed in ' . ($executionEnd - $executionStart) . ' seconds');
|
||||
Console::info(count($list) . ' functions listed in ' . ($executionEnd - $executionStart) . ' seconds');
|
||||
|
||||
/**
|
||||
* 1. Get event args - DONE
|
||||
|
@ -96,9 +96,9 @@ Console::info(count($list).' functions listed in ' . ($executionEnd - $execution
|
|||
|
||||
class FunctionsV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
public array $args = [];
|
||||
|
||||
public $allowed = [];
|
||||
public array $allowed = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
|
@ -125,7 +125,7 @@ class FunctionsV1 extends Worker
|
|||
|
||||
$database = new Database();
|
||||
$database->setAdapter(new RedisAdapter(new MySQLAdapter($db, $cache), $cache));
|
||||
$database->setNamespace('app_'.$projectId);
|
||||
$database->setNamespace('app_' . $projectId);
|
||||
$database->setMocks(Config::getParam('collections', []));
|
||||
|
||||
switch ($trigger) {
|
||||
|
@ -133,7 +133,8 @@ class FunctionsV1 extends Worker
|
|||
$limit = 30;
|
||||
$sum = 30;
|
||||
$offset = 0;
|
||||
$functions = []; /** @var Document[] $functions */
|
||||
$functions = [];
|
||||
/** @var Document[] $functions */
|
||||
|
||||
while ($sum >= $limit) {
|
||||
|
||||
|
@ -146,7 +147,7 @@ class FunctionsV1 extends Worker
|
|||
'orderType' => 'ASC',
|
||||
'orderCast' => 'string',
|
||||
'filters' => [
|
||||
'$collection='.Database::SYSTEM_COLLECTION_FUNCTIONS,
|
||||
'$collection=' . Database::SYSTEM_COLLECTION_FUNCTIONS,
|
||||
],
|
||||
]);
|
||||
|
||||
|
@ -155,21 +156,33 @@ class FunctionsV1 extends Worker
|
|||
$sum = \count($functions);
|
||||
$offset = $offset + $limit;
|
||||
|
||||
Console::log('Fetched '.$sum.' functions...');
|
||||
Console::log('Fetched ' . $sum . ' functions...');
|
||||
|
||||
foreach($functions as $function) {
|
||||
foreach ($functions as $function) {
|
||||
$events = $function->getAttribute('events', []);
|
||||
$tag = $function->getAttribute('tag', []);
|
||||
|
||||
Console::success('Itterating function: '.$function->getAttribute('name'));
|
||||
Console::success('Itterating function: ' . $function->getAttribute('name'));
|
||||
|
||||
if(!\in_array($event, $events) || empty($tag)) {
|
||||
if (!\in_array($event, $events) || empty($tag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Console::success('Triggered function: '.$event);
|
||||
Console::success('Triggered function: ' . $event);
|
||||
|
||||
$this->execute('event', $projectId, '', $database, $function, $event, $eventData, $data, $webhooks, $userId, $jwt);
|
||||
$this->execute(
|
||||
trigger: 'event',
|
||||
projectId: $projectId,
|
||||
executionId: '',
|
||||
database: $database,
|
||||
function: $function,
|
||||
event: $event,
|
||||
eventData: $eventData,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -194,10 +207,10 @@ class FunctionsV1 extends Worker
|
|||
Authorization::reset();
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
throw new Exception('Function not found ('.$functionId.')');
|
||||
throw new Exception('Function not found (' . $functionId . ')');
|
||||
}
|
||||
|
||||
if($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
|
||||
if ($scheduleOriginal && $scheduleOriginal !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -206,8 +219,7 @@ class FunctionsV1 extends Worker
|
|||
|
||||
$function
|
||||
->setAttribute('scheduleNext', $next)
|
||||
->setAttribute('schedulePrevious', \time())
|
||||
;
|
||||
->setAttribute('schedulePrevious', \time());
|
||||
|
||||
Authorization::disable();
|
||||
|
||||
|
@ -215,6 +227,10 @@ class FunctionsV1 extends Worker
|
|||
'scheduleNext' => $next,
|
||||
]));
|
||||
|
||||
if ($function === false) {
|
||||
throw new Exception('Function update failed (' . $functionId . ')');
|
||||
}
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
|
||||
|
@ -226,7 +242,17 @@ class FunctionsV1 extends Worker
|
|||
'scheduleOriginal' => $function->getAttribute('schedule', ''),
|
||||
]); // Async task rescheduale
|
||||
|
||||
$this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt);
|
||||
$this->execute(
|
||||
trigger: $trigger,
|
||||
projectId: $projectId,
|
||||
executionId: $executionId,
|
||||
database: $database,
|
||||
function: $function,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
break;
|
||||
|
||||
case 'http':
|
||||
|
@ -235,14 +261,20 @@ class FunctionsV1 extends Worker
|
|||
Authorization::reset();
|
||||
|
||||
if (empty($function->getId()) || Database::SYSTEM_COLLECTION_FUNCTIONS != $function->getCollection()) {
|
||||
throw new Exception('Function not found ('.$functionId.')');
|
||||
throw new Exception('Function not found (' . $functionId . ')');
|
||||
}
|
||||
|
||||
$this->execute($trigger, $projectId, $executionId, $database, $function, /*$event*/'', /*$eventData*/'', $data, $webhooks, $userId, $jwt);
|
||||
break;
|
||||
|
||||
default:
|
||||
# code...
|
||||
$this->execute(
|
||||
trigger: $trigger,
|
||||
projectId: $projectId,
|
||||
executionId: $executionId,
|
||||
database: $database,
|
||||
function: $function,
|
||||
data: $data,
|
||||
webhooks: $webhooks,
|
||||
userId: $userId,
|
||||
jwt: $jwt
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -254,7 +286,7 @@ class FunctionsV1 extends Worker
|
|||
* @param string $projectId
|
||||
* @param string $executionId
|
||||
* @param Database $database
|
||||
* @param Database $function
|
||||
* @param Document $function
|
||||
* @param string $event
|
||||
* @param string $eventData
|
||||
* @param string $data
|
||||
|
@ -275,7 +307,7 @@ class FunctionsV1 extends Worker
|
|||
$tag = $database->getDocument($function->getAttribute('tag', ''));
|
||||
Authorization::reset();
|
||||
|
||||
if($tag->getAttribute('functionId') !== $function->getId()) {
|
||||
if ($tag->getAttribute('functionId') !== $function->getId()) {
|
||||
throw new Exception('Tag not found', 404);
|
||||
}
|
||||
|
||||
|
@ -297,18 +329,18 @@ class FunctionsV1 extends Worker
|
|||
'time' => 0,
|
||||
]);
|
||||
|
||||
if(false === $execution || ($execution instanceof Document && $execution->isEmpty())) {
|
||||
if ($execution->isEmpty()) {
|
||||
throw new Exception('Failed to create or read execution');
|
||||
}
|
||||
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')]))
|
||||
? $runtimes[$function->getAttribute('runtime', '')]
|
||||
: null;
|
||||
|
||||
if(\is_null($runtime)) {
|
||||
throw new Exception('Runtime "'.$function->getAttribute('runtime', '').'" is not supported');
|
||||
if (\is_null($runtime)) {
|
||||
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
|
||||
}
|
||||
|
||||
$vars = \array_merge($function->getAttribute('vars', []), [
|
||||
|
@ -325,37 +357,37 @@ class FunctionsV1 extends Worker
|
|||
'APPWRITE_FUNCTION_JWT' => $jwt,
|
||||
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
|
||||
]);
|
||||
|
||||
$tagId = $tag->getId() ?? '';
|
||||
$tagPath = $tag->getAttribute('path', '');
|
||||
$tagPathTarget = '/tmp/project-'.$projectId.'/'.$tag->getId().'/code.tar.gz';
|
||||
$tagPathTarget = '/tmp/project-' . $projectId . '/' . $tagId . '/code.tar.gz';
|
||||
$tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME);
|
||||
$container = 'appwrite-function-'.$tag->getId();
|
||||
$container = 'appwrite-function-' . $tagId;
|
||||
$command = \escapeshellcmd($tag->getAttribute('command', ''));
|
||||
|
||||
if(!\is_readable($tagPath)) {
|
||||
throw new Exception('Code is not readable: '.$tag->getAttribute('path', ''));
|
||||
if (!\is_readable($tagPath)) {
|
||||
throw new Exception('Code is not readable: ' . $tag->getAttribute('path', ''));
|
||||
}
|
||||
|
||||
if (!\file_exists($tagPathTargetDir)) {
|
||||
if (!\mkdir($tagPathTargetDir, 0755, true)) {
|
||||
throw new Exception('Can\'t create directory '.$tagPathTargetDir);
|
||||
}
|
||||
}
|
||||
|
||||
if (!\file_exists($tagPathTarget)) {
|
||||
if(!\copy($tagPath, $tagPathTarget)) {
|
||||
throw new Exception('Can\'t create temporary code file '.$tagPathTarget);
|
||||
throw new Exception('Can\'t create directory ' . $tagPathTargetDir);
|
||||
}
|
||||
}
|
||||
|
||||
if(isset($list[$container]) && !(\substr($list[$container]->getStatus(), 0, 2) === 'Up')) { // Remove conatiner if not online
|
||||
if (!\file_exists($tagPathTarget)) {
|
||||
if (!\copy($tagPath, $tagPathTarget)) {
|
||||
throw new Exception('Can\'t create temporary code file ' . $tagPathTarget);
|
||||
}
|
||||
}
|
||||
|
||||
if (isset($list[$container]) && !(\substr($list[$container]->getStatus(), 0, 2) === 'Up')) { // Remove conatiner if not online
|
||||
$stdout = '';
|
||||
$stderr = '';
|
||||
|
||||
try {
|
||||
$orchestration->remove($container);
|
||||
} catch (Exception $e) {
|
||||
Console::warning('Failed to remove container: '.$e->getMessage());
|
||||
Console::warning('Failed to remove container: ' . $e->getMessage());
|
||||
}
|
||||
|
||||
unset($list[$container]);
|
||||
|
@ -370,10 +402,10 @@ class FunctionsV1 extends Worker
|
|||
* Make sure no access to NFS server / storage volumes
|
||||
* Access Appwrite REST from internal network for improved performance
|
||||
*/
|
||||
if(!isset($list[$container])) { // Create contianer if not ready
|
||||
if (!isset($list[$container])) { // Create contianer if not ready
|
||||
$stdout = '';
|
||||
$stderr = '';
|
||||
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
$executionTime = \time();
|
||||
|
||||
|
@ -381,16 +413,17 @@ class FunctionsV1 extends Worker
|
|||
$orchestration->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256'));
|
||||
$orchestration->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256'));
|
||||
|
||||
foreach($vars as &$value) {
|
||||
foreach ($vars as &$value) {
|
||||
$value = strval($value);
|
||||
}
|
||||
|
||||
$id = $orchestration->run(
|
||||
image: $runtime['image'],
|
||||
name: $container,
|
||||
command: ['tail',
|
||||
'-f',
|
||||
'/dev/null'
|
||||
command: [
|
||||
'tail',
|
||||
'-f',
|
||||
'/dev/null'
|
||||
],
|
||||
entrypoint: '',
|
||||
workdir: '/usr/local/src',
|
||||
|
@ -400,38 +433,43 @@ class FunctionsV1 extends Worker
|
|||
labels: [
|
||||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($executionTime)
|
||||
]);
|
||||
]
|
||||
);
|
||||
|
||||
$untarStdout = '';
|
||||
$untarStderr = '';
|
||||
|
||||
$untarSuccess = $orchestration->execute(
|
||||
name: $container,
|
||||
name: $container,
|
||||
command: [
|
||||
'sh',
|
||||
'-c',
|
||||
'mv /tmp/code.tar.gz /usr/local/src/code.tar.gz && tar -zxf /usr/local/src/code.tar.gz --strip 1 && rm /usr/local/src/code.tar.gz'
|
||||
],
|
||||
stdout: $untarStdout,
|
||||
stdout: $untarStdout,
|
||||
stderr: $untarStderr,
|
||||
vars: $vars,
|
||||
timeout: 60);
|
||||
timeout: 60
|
||||
);
|
||||
|
||||
if (!$untarSuccess) {
|
||||
throw new Exception('Failed to extract tar: '.$untarStderr);
|
||||
throw new Exception('Failed to extract tar: ' . $untarStderr);
|
||||
}
|
||||
|
||||
$executionEnd = \microtime(true);
|
||||
|
||||
$list[$container] = new Container($container, $id, 'Up',
|
||||
$list[$container] = new Container(
|
||||
$container,
|
||||
$id,
|
||||
'Up',
|
||||
[
|
||||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($executionTime),
|
||||
]);
|
||||
]
|
||||
);
|
||||
|
||||
Console::info('Function created in ' . ($executionEnd - $executionStart) . ' seconds');
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
Console::info('Container is ready to run');
|
||||
}
|
||||
|
||||
|
@ -444,12 +482,13 @@ class FunctionsV1 extends Worker
|
|||
|
||||
try {
|
||||
$exitCode = (int)!$orchestration->execute(
|
||||
name: $container,
|
||||
command: $orchestration->parseCommandString($command),
|
||||
stdout: $stdout,
|
||||
stderr: $stderr,
|
||||
vars: $vars,
|
||||
timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)));
|
||||
name: $container,
|
||||
command: $orchestration->parseCommandString($command),
|
||||
stdout: $stdout,
|
||||
stderr: $stderr,
|
||||
vars: $vars,
|
||||
timeout: $function->getAttribute('timeout', (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900))
|
||||
);
|
||||
} catch (TimeoutException $e) {
|
||||
$exitCode = 124;
|
||||
} catch (OrchestrationException $e) {
|
||||
|
@ -473,10 +512,10 @@ class FunctionsV1 extends Worker
|
|||
'stderr' => \mb_substr($stderr, -4000), // log last 4000 chars output
|
||||
'time' => $executionTime
|
||||
]));
|
||||
|
||||
|
||||
Authorization::reset();
|
||||
|
||||
if (false === $function) {
|
||||
if ($execution === false) {
|
||||
throw new Exception('Failed saving execution to DB', 500);
|
||||
}
|
||||
|
||||
|
@ -501,10 +540,9 @@ class FunctionsV1 extends Worker
|
|||
->setParam('functionStatus', $functionStatus)
|
||||
->setParam('functionExecutionTime', $executionTime * 1000) // ms
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0)
|
||||
;
|
||||
|
||||
if(App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
->setParam('networkResponseSize', 0);
|
||||
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$usage->trigger();
|
||||
}
|
||||
|
||||
|
@ -518,28 +556,30 @@ class FunctionsV1 extends Worker
|
|||
*/
|
||||
public function cleanup(): void
|
||||
{
|
||||
/** @var Container[] $list */
|
||||
global $list;
|
||||
/** @var Orchestration $orchestration */
|
||||
global $orchestration;
|
||||
|
||||
Console::success(count($list).' running containers counted');
|
||||
Console::success(count($list) . ' running containers counted');
|
||||
|
||||
$max = (int) App::getEnv('_APP_FUNCTIONS_CONTAINERS');
|
||||
|
||||
if(\count($list) > $max) {
|
||||
if (\count($list) > $max) {
|
||||
Console::info('Starting containers cleanup');
|
||||
|
||||
\uasort($list, function ($item1, $item2) {
|
||||
\uasort($list, function (Container $item1, Container $item2) {
|
||||
return (int)($item1->getLabels['appwrite-created'] ?? 0) <=> (int)($item2->getLabels['appwrite-created'] ?? 0);
|
||||
});
|
||||
|
||||
while(\count($list) > $max) {
|
||||
while (\count($list) > $max) {
|
||||
$first = \array_shift($list);
|
||||
|
||||
try {
|
||||
$orchestration->remove($first->getName(), true);
|
||||
Console::info('Removed container: '.$first->getName());
|
||||
Console::info('Removed container: ' . $first->getName());
|
||||
} catch (Exception $e) {
|
||||
Console::error('Failed to remove container: '.$e);
|
||||
Console::error('Failed to remove container: ' . $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -554,7 +594,7 @@ class FunctionsV1 extends Worker
|
|||
*/
|
||||
public function filterEnvKey(string $string): string
|
||||
{
|
||||
if(empty($this->allowed)) {
|
||||
if (empty($this->allowed)) {
|
||||
$this->allowed = array_fill_keys(\str_split('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz_'), true);
|
||||
}
|
||||
|
||||
|
@ -562,7 +602,7 @@ class FunctionsV1 extends Worker
|
|||
$output = '';
|
||||
|
||||
foreach ($string as $char) {
|
||||
if(\array_key_exists($char, $this->allowed)) {
|
||||
if (\array_key_exists($char, $this->allowed)) {
|
||||
$output .= $char;
|
||||
}
|
||||
}
|
||||
|
@ -573,4 +613,4 @@ class FunctionsV1 extends Worker
|
|||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,11 +13,6 @@ Console::success(APP_NAME . ' mails worker v1 has started' . "\n");
|
|||
|
||||
class MailsV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
|
|
@ -10,18 +10,13 @@ use Utopia\App;
|
|||
use Utopia\CLI\Console;
|
||||
use Utopia\Config\Config;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Tasks V1 Worker');
|
||||
Console::success(APP_NAME.' tasks worker v1 has started');
|
||||
Console::success(APP_NAME . ' tasks worker v1 has started');
|
||||
|
||||
class TasksV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -91,8 +86,7 @@ class TasksV1 extends Worker
|
|||
|
||||
$task
|
||||
->setAttribute('next', $next)
|
||||
->setAttribute('previous', \time())
|
||||
;
|
||||
->setAttribute('previous', \time());
|
||||
|
||||
ResqueScheduler::enqueueAt($next, 'v1-tasks', 'TasksV1', $task->getArrayCopy()); // Async task rescheduale
|
||||
|
||||
|
@ -106,7 +100,8 @@ class TasksV1 extends Worker
|
|||
\curl_setopt($ch, CURLOPT_POSTFIELDS, '');
|
||||
\curl_setopt($ch, CURLOPT_HEADER, 0);
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(APP_USERAGENT,
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(
|
||||
APP_USERAGENT,
|
||||
App::getEnv('_APP_VERSION', 'UNKNOWN'),
|
||||
App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS', APP_EMAIL_SECURITY)
|
||||
));
|
||||
|
@ -114,8 +109,8 @@ class TasksV1 extends Worker
|
|||
$ch,
|
||||
CURLOPT_HTTPHEADER,
|
||||
\array_merge($headers, [
|
||||
'X-'.APP_NAME.'-Task-ID: '.$task->getAttribute('$id', ''),
|
||||
'X-'.APP_NAME.'-Task-Name: '.$task->getAttribute('name', ''),
|
||||
'X-' . APP_NAME . '-Task-ID: ' . $task->getAttribute('$id', ''),
|
||||
'X-' . APP_NAME . '-Task-Name: ' . $task->getAttribute('name', ''),
|
||||
])
|
||||
);
|
||||
\curl_setopt($ch, CURLOPT_HEADER, true); // we want headers
|
||||
|
@ -138,7 +133,7 @@ class TasksV1 extends Worker
|
|||
$response = \curl_exec($ch);
|
||||
|
||||
if (false === $response) {
|
||||
$errors[] = \curl_error($ch).'Failed to execute task';
|
||||
$errors[] = \curl_error($ch) . 'Failed to execute task';
|
||||
}
|
||||
|
||||
$code = \curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
||||
|
@ -154,22 +149,21 @@ class TasksV1 extends Worker
|
|||
switch ($codeFamily) {
|
||||
case '2':
|
||||
case '3':
|
||||
break;
|
||||
break;
|
||||
default:
|
||||
$errors[] = 'Request failed with status code '.$code;
|
||||
$errors[] = 'Request failed with status code ' . $code;
|
||||
}
|
||||
|
||||
if (empty($errors)) {
|
||||
$task->setAttribute('failures', 0);
|
||||
|
||||
$alert = 'Task "'.$task->getAttribute('name').'" Executed Successfully';
|
||||
$alert = 'Task "' . $task->getAttribute('name') . '" Executed Successfully';
|
||||
} else {
|
||||
$task
|
||||
->setAttribute('failures', $task->getAttribute('failures', 0) + 1)
|
||||
->setAttribute('status', ($task->getAttribute('failures') >= $errorLimit) ? 'pause' : 'play')
|
||||
;
|
||||
->setAttribute('status', ($task->getAttribute('failures') >= $errorLimit) ? 'pause' : 'play');
|
||||
|
||||
$alert = 'Task "'.$task->getAttribute('name').'" failed to execute with the following errors: '.\implode("\n", $errors);
|
||||
$alert = 'Task "' . $task->getAttribute('name') . '" failed to execute with the following errors: ' . \implode("\n", $errors);
|
||||
}
|
||||
|
||||
$log = \json_decode($task->getAttribute('log', '{}'), true);
|
||||
|
@ -190,8 +184,7 @@ class TasksV1 extends Worker
|
|||
$task
|
||||
->setAttribute('log', \json_encode($log))
|
||||
->setAttribute('duration', $totalTime)
|
||||
->setAttribute('delay', $delay)
|
||||
;
|
||||
->setAttribute('delay', $delay);
|
||||
|
||||
Authorization::disable();
|
||||
|
||||
|
@ -211,4 +204,4 @@ class TasksV1 extends Worker
|
|||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,18 +4,13 @@ use Appwrite\Resque\Worker;
|
|||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Usage V1 Worker');
|
||||
Console::success(APP_NAME.' usage worker v1 has started');
|
||||
Console::success(APP_NAME . ' usage worker v1 has started');
|
||||
|
||||
class UsageV1 extends Worker
|
||||
{
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -33,7 +28,7 @@ class UsageV1 extends Worker
|
|||
|
||||
$networkRequestSize = $this->args['networkRequestSize'] ?? 0;
|
||||
$networkResponseSize = $this->args['networkResponseSize'] ?? 0;
|
||||
|
||||
|
||||
$httpMethod = $this->args['httpMethod'] ?? '';
|
||||
$httpRequest = $this->args['httpRequest'] ?? 0;
|
||||
|
||||
|
@ -42,30 +37,30 @@ class UsageV1 extends Worker
|
|||
$functionExecutionTime = $this->args['functionExecutionTime'] ?? 0;
|
||||
$functionStatus = $this->args['functionStatus'] ?? '';
|
||||
|
||||
$tags = ",project={$projectId},version=".App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
$tags = ",project={$projectId},version=" . App::getEnv('_APP_VERSION', 'UNKNOWN');
|
||||
|
||||
// the global namespace is prepended to every key (optional)
|
||||
$statsd->setNamespace('appwrite.usage');
|
||||
|
||||
if($httpRequest >= 1) {
|
||||
$statsd->increment('requests.all'.$tags.',method='.\strtolower($httpMethod));
|
||||
}
|
||||
|
||||
if($functionExecution >= 1) {
|
||||
$statsd->increment('executions.all'.$tags.',functionId='.$functionId.',functionStatus='.$functionStatus);
|
||||
$statsd->count('executions.time'.$tags.',functionId='.$functionId, $functionExecutionTime);
|
||||
if ($httpRequest >= 1) {
|
||||
$statsd->increment('requests.all' . $tags . ',method=' . \strtolower($httpMethod));
|
||||
}
|
||||
|
||||
$statsd->count('network.inbound'.$tags, $networkRequestSize);
|
||||
$statsd->count('network.outbound'.$tags, $networkResponseSize);
|
||||
$statsd->count('network.all'.$tags, $networkRequestSize + $networkResponseSize);
|
||||
if ($functionExecution >= 1) {
|
||||
$statsd->increment('executions.all' . $tags . ',functionId=' . $functionId . ',functionStatus=' . $functionStatus);
|
||||
$statsd->count('executions.time' . $tags . ',functionId=' . $functionId, $functionExecutionTime);
|
||||
}
|
||||
|
||||
if($storage >= 1) {
|
||||
$statsd->count('storage.all'.$tags, $storage);
|
||||
$statsd->count('network.inbound' . $tags, $networkRequestSize);
|
||||
$statsd->count('network.outbound' . $tags, $networkResponseSize);
|
||||
$statsd->count('network.all' . $tags, $networkRequestSize + $networkResponseSize);
|
||||
|
||||
if ($storage >= 1) {
|
||||
$statsd->count('storage.all' . $tags, $storage);
|
||||
}
|
||||
}
|
||||
|
||||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,15 +4,13 @@ use Appwrite\Resque\Worker;
|
|||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
|
||||
require_once __DIR__.'/../workers.php';
|
||||
require_once __DIR__ . '/../workers.php';
|
||||
|
||||
Console::title('Webhooks V1 Worker');
|
||||
Console::success(APP_NAME.' webhooks worker v1 has started');
|
||||
Console::success(APP_NAME . ' webhooks worker v1 has started');
|
||||
|
||||
class WebhooksV1 extends Worker
|
||||
{
|
||||
public $args = [];
|
||||
|
||||
public function init(): void
|
||||
{
|
||||
}
|
||||
|
@ -37,7 +35,7 @@ class WebhooksV1 extends Worker
|
|||
$name = $webhook['name'] ?? '';
|
||||
$signature = $webhook['signature'] ?? 'not-yet-implemented';
|
||||
$url = $webhook['url'] ?? '';
|
||||
$security = (bool) $webhook['security'] ?? true;
|
||||
$security = (bool) ($webhook['security'] ?? true);
|
||||
$httpUser = $webhook['httpUser'] ?? null;
|
||||
$httpPass = $webhook['httpPass'] ?? null;
|
||||
|
||||
|
@ -47,7 +45,8 @@ class WebhooksV1 extends Worker
|
|||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $eventData);
|
||||
\curl_setopt($ch, CURLOPT_HEADER, 0);
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(APP_USERAGENT,
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(
|
||||
APP_USERAGENT,
|
||||
App::getEnv('_APP_VERSION', 'UNKNOWN'),
|
||||
App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS', APP_EMAIL_SECURITY)
|
||||
));
|
||||
|
@ -56,13 +55,13 @@ class WebhooksV1 extends Worker
|
|||
CURLOPT_HTTPHEADER,
|
||||
[
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: '.\strlen($eventData),
|
||||
'X-'.APP_NAME.'-Webhook-Id: '.$id,
|
||||
'X-'.APP_NAME.'-Webhook-Event: '.$event,
|
||||
'X-'.APP_NAME.'-Webhook-Name: '.$name,
|
||||
'X-'.APP_NAME.'-Webhook-User-Id: '.$userId,
|
||||
'X-'.APP_NAME.'-Webhook-Project-Id: '.$projectId,
|
||||
'X-'.APP_NAME.'-Webhook-Signature: '.$signature,
|
||||
'Content-Length: ' . \strlen($eventData),
|
||||
'X-' . APP_NAME . '-Webhook-Id: ' . $id,
|
||||
'X-' . APP_NAME . '-Webhook-Event: ' . $event,
|
||||
'X-' . APP_NAME . '-Webhook-Name: ' . $name,
|
||||
'X-' . APP_NAME . '-Webhook-User-Id: ' . $userId,
|
||||
'X-' . APP_NAME . '-Webhook-Project-Id: ' . $projectId,
|
||||
'X-' . APP_NAME . '-Webhook-Signature: ' . $signature,
|
||||
]
|
||||
);
|
||||
|
||||
|
@ -77,7 +76,7 @@ class WebhooksV1 extends Worker
|
|||
}
|
||||
|
||||
if (false === \curl_exec($ch)) {
|
||||
$errors[] = \curl_error($ch).' in event '.$event.' for webhook '.$name;
|
||||
$errors[] = \curl_error($ch) . ' in event ' . $event . ' for webhook ' . $name;
|
||||
}
|
||||
|
||||
\curl_close($ch);
|
||||
|
@ -91,4 +90,4 @@ class WebhooksV1 extends Worker
|
|||
public function shutdown(): void
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ namespace Appwrite\Resque;
|
|||
|
||||
abstract class Worker
|
||||
{
|
||||
public $args = [];
|
||||
public array $args = [];
|
||||
|
||||
abstract public function init(): void;
|
||||
|
||||
|
@ -17,7 +17,7 @@ abstract class Worker
|
|||
$this->init();
|
||||
}
|
||||
|
||||
public function perform()
|
||||
public function perform(): void
|
||||
{
|
||||
$this->run();
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue