1
0
Fork 0
mirror of synced 2024-10-03 02:37:40 +13:00

certificates worker

This commit is contained in:
shimon 2023-09-28 20:37:07 +03:00
parent a10246a9bd
commit 9cf83151cd
3 changed files with 347 additions and 399 deletions

View file

@ -21,9 +21,9 @@ use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID;
use Utopia\DSN\DSN;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
use Utopia\VCS\Adapter\Git\GitHub;
@ -44,19 +44,19 @@ class Builds extends Action
->desc('Builds worker')
->inject('message')
->inject('dbForConsole')
->inject('dbForProject')
->inject('queueForEvents')
->inject('queueForFunctions')
->inject('queueForUsage')
->inject('cache')
->inject('getProjectDB')
->callback(fn($message, Database $dbForConsole, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB) => $this->action($message, $dbForConsole, $dbForProject, $queueForEvents, $queueForFunctions, $queueForUsage, $cache, $getProjectDB));
->inject('deviceFunctions')
->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $queueForUsage, $cache, $getProjectDB, $deviceFunctions));
}
/**
* @throws Exception|\Throwable
*/
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB): void
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, callable $getProjectDB, callable $deviceFunctions): void
{
$payload = $message->getPayload() ?? [];
@ -75,7 +75,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($queueForEvents, $queueForUsage, $getProjectDB, $dbForConsole, $github, $project, $resource, $deployment, $template);
$this->buildDeployment($deviceFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $getProjectDB, $github, $project, $resource, $deployment, $template);
break;
default:
@ -88,7 +88,7 @@ class Builds extends Action
* @throws \Throwable
* @throws Structure
*/
protected function buildDeployment(Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template)
protected function buildDeployment(callable $deviceFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, callable $getProjectDB, GitHub $github, Document $project, Document $function, Document $deployment, Document $template)
{
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
@ -116,7 +116,6 @@ class Builds extends Action
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
// Realtime preparation
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
'functionId' => $function->getId(),
@ -125,9 +124,7 @@ class Builds extends Action
$startTime = DateTime::now();
$durationStart = \microtime(true);
$buildId = $deployment->getAttribute('buildId', '');
$isNewBuild = empty($buildId);
if ($isNewBuild) {
@ -272,7 +269,7 @@ class Builds extends Action
Console::execute('tar --exclude code.tar.gz -czf ' . $tmpPathFile . ' -C /tmp/builds/' . \escapeshellcmd($buildId) . '/code' . (empty($rootDirectory) ? '' : '/' . $rootDirectory) . ' .', '', $stdout, $stderr);
$deviceFunctions = $this->getFunctionsDevice($project->getId());
$deviceFunctions = $deviceFunctions($project->getId());
$localDevice = new Local();
$buffer = $localDevice->read($tmpPathFile);
@ -303,6 +300,7 @@ class Builds extends Action
}
/** Trigger Webhook */
$deploymentModel = new Deployment();
$deploymentUpdate =
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
@ -311,11 +309,8 @@ class Builds extends Action
->setEvent('functions.[functionId].deployments.[deploymentId].update')
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy(
array_keys(
(new Deployment())->getRules()
)
));
->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules())))
;
$deploymentUpdate->trigger();
@ -593,217 +588,3 @@ class Builds extends Action
}
}
}
// // Realtime preparation
// $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
// 'functionId' => $function->getId(),
// 'deploymentId' => $deployment->getId()
// ]);
//
// $startTime = DateTime::now();
// $durationStart = \microtime(true);
//
//
//
// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
// $function = $dbForProject->getDocument('functions', $function->getId());
//
// if ($function->isEmpty()) {
// throw new Exception('Function not found', 404);
// }
//
// $deployment = $dbForProject->getDocument('deployments', $deployment->getId());
// if ($deployment->isEmpty()) {
// throw new Exception('Deployment not found', 404);
// }
//
// $runtimes = Config::getParam('runtimes', []);
// $key = $function->getAttribute('runtime');
// $runtime = $runtimes[$key] ?? null;
// if (\is_null($runtime)) {
// throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
// }
//
// $connection = App::getEnv('_APP_CONNECTIONS_STORAGE', '');
// /** @TODO : move this to the registry or someplace else */
// $device = Storage::DEVICE_LOCAL;
// try {
// $dsn = new DSN($connection);
// $device = $dsn->getScheme();
// } catch (\Exception $e) {
// Console::error($e->getMessage() . 'Invalid DSN. Defaulting to Local device.');
// }
//
// $buildId = $deployment->getAttribute('buildId', '');
// $startTime = DateTime::now();
//
// if (empty($buildId)) {
// $buildId = ID::unique();
// $build = $dbForProject->createDocument('builds', new Document([
// '$id' => $buildId,
// '$permissions' => [],
// 'startTime' => $startTime,
// 'deploymentInternalId' => $deployment->getInternalId(),
// 'deploymentId' => $deployment->getId(),
// 'status' => 'processing',
// 'outputPath' => '',
// 'runtime' => $function->getAttribute('runtime'),
// 'source' => $deployment->getAttribute('path'),
// 'sourceType' => $device,
// 'stdout' => '',
// 'stderr' => '',
// 'duration' => 0
// ]));
//
// $deployment->setAttribute('buildId', $buildId);
// $deployment->setAttribute('buildInternalId', $build->getInternalId());
// $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
// } else {
// $build = $dbForProject->getDocument('builds', $buildId);
// }
//
// /** Request the executor to build the code... */
// $build->setAttribute('status', 'building');
// $build = $dbForProject->updateDocument('builds', $buildId, $build);
//
// /** Trigger Webhook */
// $deploymentUpdate = $queueForEvents
// ->setQueue(Event::WEBHOOK_QUEUE_NAME)
// ->setClass(Event::WEBHOOK_CLASS_NAME)
// ->setProject($project)
// ->setEvent('functions.[functionId].deployments.[deploymentId].update')
// ->setParam('functionId', $function->getId())
// ->setParam('deploymentId', $deployment->getId())
// ->setPayload($deployment->getArrayCopy(
// array_keys(
// (new Deployment())->getRules()
// )
// ));
//
// $deploymentUpdate->trigger();
//
// /** Trigger Functions */
// $queueForFunctions
// ->from($deploymentUpdate)
// ->trigger();
//
//
// /** Trigger Realtime */
// $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
// 'functionId' => $function->getId(),
// 'deploymentId' => $deployment->getId()
// ]);
// $target = Realtime::fromPayload(
// // Pass first, most verbose event pattern
// event: $allEvents[0],
// payload: $build,
// project: $project
// );
//
// Realtime::send(
// projectId: 'console',
// payload: $build->getArrayCopy(),
// events: $allEvents,
// channels: $target['channels'],
// roles: $target['roles']
// );
//
// $source = $deployment->getAttribute('path');
//
// $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
// $carry[$var->getAttribute('key')] = $var->getAttribute('value');
// return $carry;
// }, []);
//
// try {
// $response = $executor->createRuntime(
// deploymentId: $deployment->getId(),
// projectId: $project->getId(),
// source: $source,
// image: $runtime['image'],
// remove: true,
// entrypoint: $deployment->getAttribute('entrypoint'),
// workdir: '/usr/code',
// destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}",
// variables: $vars,
// commands: [
// 'sh', '-c',
// 'tar -zxf /tmp/code.tar.gz -C /usr/code && \
// cd /usr/local/src/ && ./build.sh'
// ]
// );
//
// $endTime = new \DateTime();
// $endTime->setTimestamp($response['endTimeUnix']);
//
// /** Update the build document */
// $build->setAttribute('endTime', DateTime::format($endTime));
// $build->setAttribute('duration', \intval($response['duration']));
// $build->setAttribute('status', $response['status']);
// $build->setAttribute('outputPath', $response['outputPath']);
// $build->setAttribute('stderr', $response['stderr']);
// $build->setAttribute('stdout', $response['stdout']);
//
// /* Also update the deployment buildTime */
// $deployment->setAttribute('buildTime', $response['duration']);
//
// Console::success("Build id: $buildId created");
//
// /** Set auto deploy */
// if ($deployment->getAttribute('activate') === true) {
// $function->setAttribute('deploymentInternalId', $deployment->getInternalId());
// $function->setAttribute('deployment', $deployment->getId());
// $function = $dbForProject->updateDocument('functions', $function->getId(), $function);
// }
//
// /** Update function schedule */
// $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
// $schedule->setAttribute('resourceUpdatedAt', DateTime::now());
//
// $schedule
// ->setAttribute('schedule', $function->getAttribute('schedule'))
// ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
//
//
// \Utopia\Database\Validator\Authorization::skip(fn() => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
// } catch (\Throwable $th) {
// $endTime = DateTime::now();
// $interval = (new \DateTime($endTime))->diff(new \DateTime($startTime));
// $build->setAttribute('endTime', $endTime);
// $build->setAttribute('duration', $interval->format('%s') + 0);
// $build->setAttribute('status', 'failed');
// $build->setAttribute('stderr', $th->getMessage());
// Console::error($th->getMessage());
// } finally {
// $build = $dbForProject->updateDocument('builds', $buildId, $build);
//
// /**
// * Send realtime Event
// */
// $target = Realtime::fromPayload(
// // Pass first, most verbose event pattern
// event: $allEvents[0],
// payload: $build,
// project: $project
// );
// Realtime::send(
// projectId: 'console',
// payload: $build->getArrayCopy(),
// events: $allEvents,
// channels: $target['channels'],
// roles: $target['roles']
// );
//
// /** Trigger usage queue */
// $queueForUsage
// ->setProject($project)
// ->addMetric(METRIC_BUILDS, 1) // per project
// ->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
// ->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)
// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS), 1) // per function
// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0))
// ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000)
// ->trigger();
// }
// }
//}

View file

@ -2,9 +2,13 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\CNAME;
use Appwrite\Template\Template;
use Appwrite\Utopia\Response\Model\Rule;
use Exception;
use Throwable;
use Utopia\App;
@ -36,13 +40,15 @@ class Certificates extends Action
->inject('message')
->inject('dbForConsole')
->inject('queueForMails')
->callback(fn($message, $dbForConsole, $queueForMails) => $this->action($message, $dbForConsole, $queueForMails));
->inject('queueForEvents')
->inject('queueForFunctions')
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions));
}
/**
* @throws Exception|Throwable
*/
public function action(Message $message, Database $dbForConsole, Mail $queueForMails): void
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void
{
$payload = $message->getPayload() ?? [];
@ -54,13 +60,13 @@ class Certificates extends Action
$domain = new Domain($document->getAttribute('domain', ''));
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
$this->execute($domain, $dbForConsole, $queueForMails, $skipRenewCheck);
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck);
}
/**
* @throws Exception|Throwable
*/
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, bool $skipRenewCheck = false): void
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, bool $skipRenewCheck = false): void
{
/**
* 1. Read arguments and validate domain
@ -101,6 +107,8 @@ class Certificates extends Action
$certificate->setAttribute('domain', $domain->get());
}
$success = false;
try {
// Email for alerts is required by LetsEncrypt
$email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS');
@ -127,11 +135,9 @@ class Certificates extends Action
$letsEncryptData = $this->issueCertificate($folder, $domain->get(), $email);
// Command succeeded, store all data into document
// We store stderr too, because it may include warnings
$certificate->setAttribute('log', \json_encode([
'stdout' => $letsEncryptData['stdout'],
'stderr' => $letsEncryptData['stderr'],
]));
$logs = 'Certificate successfully generated.';
$certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB
// Give certificates to Traefik
$this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData);
@ -140,9 +146,12 @@ class Certificates extends Action
$certificate->setAttribute('renewDate', $this->getRenewDate($domain->get()));
$certificate->setAttribute('attempts', 0);
$certificate->setAttribute('issueDate', DateTime::now());
$success = true;
} catch (Throwable $e) {
$logs = $e->getMessage();
// Set exception as log in certificate document
$certificate->setAttribute('log', $e->getMessage());
$certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB
// Increase attempts count
$attempts = $certificate->getAttribute('attempts', 0) + 1;
@ -158,26 +167,45 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $dbForConsole);
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
}
}
/**
* Save certificate data into database.
*
* @param string $domain Domain name that certificate is for
* @param Document $certificate Certificate document that we need to save
* @param Database $dbForConsole Database connection for console
* @return void
* @throws Exception|Throwable
*/
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{
// Check if update or insert required
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) {
// Merge new data with current data
$certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy()));
$certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate);
} else {
$certificate = $dbForConsole->createDocument('certificates', $certificate);
}
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
}
/**
* Get main domain. Needed as we do different checks for main and non-main domains.
*
* @return null|string Returns main domain. If null, there is no main domain yet.
* @throws Exception
*/
private function getMainDomain(Database $dbForConsole): ?string
{
$envDomain = App::getEnv('_APP_DOMAIN', '');
if (!empty($envDomain) && $envDomain !== 'localhost') {
return $envDomain;
} else {
$domainDocument = $dbForConsole->findOne('domains', [Query::orderAsc('_id')]);
if ($domainDocument) {
return $domainDocument->getAttribute('domain');
}
}
return null;
@ -190,6 +218,7 @@ class Certificates extends Action
*
* @param Domain $domain Domain which we validate
* @param bool $isMainDomain In case of master domain, we look for different DNS configurations
*
* @return void
* @throws Exception
*/
@ -267,7 +296,7 @@ class Certificates extends Action
$stderr = '';
$staging = (App::isProduction()) ? '' : ' --dry-run';
$exit = Console::execute("certbot certonly --webroot --noninteractive --agree-tos{$staging}"
$exit = Console::execute("certbot certonly -v --webroot --noninteractive --agree-tos{$staging}"
. " --email " . $email
. " --cert-name " . $folder
. " -w " . APP_STORAGE_CERTIFICATES
@ -284,6 +313,22 @@ class Certificates extends Action
];
}
/**
* Read new renew date from certificate file generated by Let's Encrypt
*
* @param string $domain Domain which certificate was generated for
* @return string
* @throws \Utopia\Database\Exception
*/
private function getRenewDate(string $domain): string
{
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
$certData = openssl_x509_parse(file_get_contents($certPath));
$validTo = $certData['validTo_time_t'] ?? null;
$dt = (new \DateTime())->setTimestamp($validTo);
return DateTime::addSeconds($dt, -60 * 60 * 24 * 30); // -30 days
}
/**
* Method to take files from Let's Encrypt, and put it into Traefik.
*
@ -349,19 +394,20 @@ class Certificates extends Action
// Log error into console
Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage);
// Send mail to administrator mail
// Send mail to administratore mail
$locale = new Locale(App::getEnv('_APP_LOCALE', 'en'));
if (!$locale->getText('emails.sender') || !$locale->getText("emails.certificate.hello") || !$locale->getText("emails.certificate.subject") || !$locale->getText("emails.certificate.body") || !$locale->getText("emails.certificate.footer") || !$locale->getText("emails.certificate.thanks") || !$locale->getText("emails.certificate.signature")) {
$locale->setDefault('en');
}
$body = Template::fromFile(__DIR__ . '/../../config/locale/templates/email-base.tpl');
$body = Template::fromFile(__DIR__ . '/../config/locale/templates/email-base.tpl');
$subject = \sprintf($locale->getText("emails.certificate.subject"), $domain);
$body->setParam('{{domain}}', $domain);
$body->setParam('{{error}}', $errorMessage);
$body->setParam('{{attempt}}', $attempt);
$body
->setParam('{{domain}}', $domain)
->setParam('{{error}}', $errorMessage)
->setParam('{{attempt}}', $attempt)
->setParam('{{subject}}', $subject)
->setParam('{{hello}}', $locale->getText("emails.certificate.hello"))
->setParam('{{body}}', $locale->getText("emails.certificate.body"))
@ -391,50 +437,69 @@ class Certificates extends Action
*
* @param string $certificateId ID of a new or updated certificate document
* @param string $domain Domain that is affected by new certificate
* @param Database $dbForConsole Database instance for console
* @param bool $success Was certificate generation successful?
*
* @return void
* @throws Exception
*/
private function updateDomainDocuments(string $certificateId, string $domain, Database $dbForConsole): void
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{
$domains = $dbForConsole->find('domains', [
$rule = $dbForConsole->findOne('rules', [
Query::equal('domain', [$domain]),
Query::limit(1000),
]);
foreach ($domains as $domainDocument) {
$domainDocument->setAttribute('updated', DateTime::now());
$domainDocument->setAttribute('certificateId', $certificateId);
$dbForConsole->updateDocument('domains', $domainDocument->getId(), $domainDocument);
if ($rule !== false && !$rule->isEmpty()) {
$rule->setAttribute('certificateId', $certificateId);
$rule->setAttribute('status', $success ? 'verified' : 'unverified');
$dbForConsole->updateDocument('rules', $rule->getId(), $rule);
if ($domainDocument->getAttribute('projectId')) {
$dbForConsole->deleteCachedDocument('projects', $domainDocument->getAttribute('projectId'));
$projectId = $rule->getAttribute('projectId');
// Skip events for console project (triggered by auto-ssl generation for 1 click setups)
if ($projectId === 'console') {
return;
}
}
}
/**
* Save certificate data into database.
*
* @param string $domain Domain name that certificate is for
* @param Document $certificate Certificate document that we need to save
* @param Database $dbForConsole Database connection for console
* @return void
* @throws Exception|\Throwable
*/
private function saveCertificateDocument(string $domain, Document $certificate, Database $dbForConsole): void
{
// Check if update or insert required
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) {
// Merge new data with current data
$certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy()));
$certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate);
} else {
$certificate = $dbForConsole->createDocument('certificates', $certificate);
}
$project = $dbForConsole->getDocument('projects', $projectId);
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $dbForConsole);
/** Trigger Webhook */
$ruleModel = new Rule();
$queueForEvents
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->trigger();
/** Trigger Functions */
$queueForFunctions
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('rules.[ruleId].update', [
'ruleId' => $rule->getId(),
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $rule,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
}
}
}

View file

@ -9,6 +9,7 @@ use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Response\Model\Execution;
use Exception;
use Executor\Executor;
use Throwable;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -20,6 +21,7 @@ use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
@ -42,13 +44,14 @@ class Functions extends Action
->inject('queueForFunctions')
->inject('queueForEvents')
->inject('queueForUsage')
->callback(fn($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage));
->inject('log')
->callback(fn(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log));
}
/**
* @throws Exception|\Throwable
* @throws Exception|Throwable
*/
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage): void
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -69,6 +72,9 @@ class Functions extends Action
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
$user = new Document($payload['user'] ?? []);
$method = $payload['method'] ?? 'POST';
$headers = $payload['headers'] ?? [];
$path = $payload['path'] ?? '/';
if ($project->getId() === 'console') {
return;
@ -99,16 +105,25 @@ class Functions extends Action
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForEvents: $queueForEvents,
queueForUsage: $queueForUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
trigger: 'event',
path: '/',
method: 'POST',
headers: [
'user-agent' => 'Appwrite/' . APP_VERSION_STABLE
],
data: null,
user: $user,
jwt: null,
event: $events[0],
eventData: \is_string($eventData) ? $eventData : \json_encode($eventData),
executionId: null,
);
Console::success('Triggered function: ' . $events[0]);
}
@ -125,28 +140,44 @@ class Functions extends Action
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForEvents: $queueForEvents,
queueForUsage: $queueForUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
trigger: 'http',
path: $path,
method: $method,
headers: $headers,
data: $data,
user: $user,
jwt: $jwt,
executionId: $execution->getId(),
event: null,
eventData: null,
executionId: $execution->getId()
);
break;
case 'schedule':
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForEvents: $queueForEvents,
queueForUsage: $queueForUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
trigger: 'schedule',
path: $path,
method: $method,
headers: $headers,
data: null,
user: null,
jwt: null,
event: null,
eventData: null,
executionId: null,
);
break;
}
@ -154,17 +185,21 @@ class Functions extends Action
/**
* @throws Authorization
* @throws \Throwable
* @throws Throwable
* @throws Structure
*/
private function execute(
Log $log,
Database $dbForProject,
Func $queueForFunctions,
Event $queueForEvents,
Usage $queueForUsage,
Event $queueForEvents,
Document $project,
Document $function,
string $trigger,
string $path,
string $method,
array $headers,
string $data = null,
?Document $user = null,
string $jwt = null,
@ -177,6 +212,9 @@ class Functions extends Action
$functionInternalId = $function->getInternalId();
$deploymentId = $function->getAttribute('deployment', '');
$log->addTag('functionId', $functionId);
$log->addTag('projectId', $project->getId());
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$deploymentInternalId = $deployment->getInternalId();
@ -200,157 +238,221 @@ class Functions extends Action
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
$version = $function->getAttribute('version', 'v2');
$runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
$runtime = $runtimes[$function->getAttribute('runtime')];
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
$headers['x-appwrite-trigger'] = $trigger;
$headers['x-appwrite-event'] = $event ?? '';
$headers['x-appwrite-user-id'] = $user->getId() ?? '';
$headers['x-appwrite-user-jwt'] = $jwt ?? '';
/** Create execution or update execution status */
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$headersFiltered = [];
foreach ($headers as $key => $value) {
if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_REQUEST)) {
$headersFiltered[] = [ 'name' => $key, 'value' => $value ];
}
}
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
$execution = new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'functionInternalId' => $functionInternalId,
'deploymentInternalId' => $deploymentInternalId,
'deploymentId' => $deploymentId,
'functionInternalId' => $function->getInternalId(),
'functionId' => $function->getId(),
'deploymentInternalId' => $deployment->getInternalId(),
'deploymentId' => $deployment->getId(),
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'status' => 'processing',
'responseStatusCode' => 0,
'responseHeaders' => [],
'requestPath' => $path,
'requestMethod' => $method,
'requestHeaders' => $headersFiltered,
'errors' => '',
'logs' => '',
'duration' => 0.0,
'search' => implode(' ', [$function->getId(), $executionId]),
]));
'search' => implode(' ', [$functionId, $executionId]),
]);
if ($function->getAttribute('logging')) {
$execution = $dbForProject->createDocument('executions', $execution);
}
// TODO: @Meldiron Trigger executions.create event here
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
/*** Usage */
$queueForUsage
->addMetric(METRIC_EXECUTIONS, 1) // per project
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1); // per function
}
if ($execution->getAttribute('status') !== 'processing') {
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
if ($function->getAttribute('logging')) {
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
}
}
/** Collect environment variables */
$durationStart = \microtime(true);
$body = $eventData ?? '';
if (empty($body)) {
$body = $data ?? '';
}
$vars = [];
// V2 vars
if ($version === 'v2') {
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
'APPWRITE_FUNCTION_EVENT' => $event ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '',
'APPWRITE_FUNCTION_DATA' => $data ?? '',
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
'APPWRITE_FUNCTION_TRIGGER' => $headers['x-appwrite-trigger'] ?? '',
'APPWRITE_FUNCTION_DATA' => $body ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $body ?? '',
'APPWRITE_FUNCTION_EVENT' => $headers['x-appwrite-event'] ?? '',
'APPWRITE_FUNCTION_USER_ID' => $headers['x-appwrite-user-id'] ?? '',
'APPWRITE_FUNCTION_JWT' => $headers['x-appwrite-user-jwt'] ?? ''
]);
}
/** Execute function */
// Shared vars
foreach ($function->getAttribute('varsProject', []) as $var) {
$vars[$var->getAttribute('key')] = $var->getAttribute('value', '');
}
// Function vars
foreach ($function->getAttribute('vars', []) as $var) {
$vars[$var->getAttribute('key')] = $var->getAttribute('value', '');
}
// Appwrite vars
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
]);
/** Execute function */
try {
$client = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$executionResponse = $client->createExecution(
$version = $function->getAttribute('version', 'v2');
$command = $runtime['startCommand'];
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"';
$executionResponse = $executor->createExecution(
projectId: $project->getId(),
deploymentId: $deploymentId,
payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
body: \strlen($body) > 0 ? $body : null,
variables: $vars,
timeout: $function->getAttribute('timeout', 0),
image: $runtime['image'],
source: $build->getAttribute('outputPath', ''),
source: $build->getAttribute('path', ''),
entrypoint: $deployment->getAttribute('entrypoint', ''),
version: $version,
path: $path,
method: $method,
headers: $headers,
runtimeEntrypoint: $command
);
$status = $executionResponse['statusCode'] >= 400 ? 'failed' : 'completed';
$headersFiltered = [];
foreach ($executionResponse['headers'] as $key => $value) {
if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_RESPONSE)) {
$headersFiltered[] = [ 'name' => $key, 'value' => $value ];
}
}
/** Update execution status */
$execution
->setAttribute('status', $executionResponse['status'])
->setAttribute('statusCode', $executionResponse['statusCode'])
->setAttribute('response', $executionResponse['response'])
->setAttribute('stdout', $executionResponse['stdout'])
->setAttribute('stderr', $executionResponse['stderr'])
->setAttribute('status', $status)
->setAttribute('responseStatusCode', $executionResponse['statusCode'])
->setAttribute('responseHeaders', $headersFiltered)
->setAttribute('logs', $executionResponse['logs'])
->setAttribute('errors', $executionResponse['errors'])
->setAttribute('duration', $executionResponse['duration']);
} catch (\Throwable $th) {
$interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt()));
$durationEnd = \microtime(true);
$execution
->setAttribute('duration', (float)$interval->format('%s.%f'))
->setAttribute('duration', $durationEnd - $durationStart)
->setAttribute('status', 'failed')
->setAttribute('statusCode', $th->getCode())
->setAttribute('stderr', $th->getMessage());
->setAttribute('responseStatusCode', 500)
->setAttribute('errors', $th->getMessage() . '\nError Code: ' . $th->getCode());
Console::error($th->getTraceAsString());
Console::error($th->getFile());
Console::error($th->getLine());
Console::error($th->getMessage());
$error = $th->getMessage();
$errorCode = $th->getCode();
}
if ($function->getAttribute('logging')) {
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
}
/** Trigger Webhook */
$executionModel = new Execution();
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Webhook */
$executionModel = new Execution();
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
if (!empty($error)) {
throw new Exception($error, $errorCode);
}
/** Trigger usage queue */
$queueForUsage
->setProject($project)
->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000))
->trigger()
;
/** Trigger usage queue */
$queueForUsage
->setProject($project)
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1)
->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000))
->trigger()
;
}
}