parent
0ea1418132
commit
025a0292f4
9 changed files with 1802 additions and 0 deletions
|
@ -0,0 +1,18 @@
|
|||
import { Client, Health } from "@appwrite.io/console";
|
||||
|
||||
const client = new Client();
|
||||
|
||||
const health = new Health(client);
|
||||
|
||||
client
|
||||
.setEndpoint('https://cloud.appwrite.io/v1') // Your API Endpoint
|
||||
.setProject('5df5acd0d48c2') // Your project ID
|
||||
;
|
||||
|
||||
const promise = health.getQueueBuilds();
|
||||
|
||||
promise.then(function (response) {
|
||||
console.log(response); // Success
|
||||
}, function (error) {
|
||||
console.log(error); // Failure
|
||||
});
|
|
@ -0,0 +1,18 @@
|
|||
import { Client, Health } from "@appwrite.io/console";
|
||||
|
||||
const client = new Client();
|
||||
|
||||
const health = new Health(client);
|
||||
|
||||
client
|
||||
.setEndpoint('https://cloud.appwrite.io/v1') // Your API Endpoint
|
||||
.setProject('5df5acd0d48c2') // Your project ID
|
||||
;
|
||||
|
||||
const promise = health.getQueueDatabases();
|
||||
|
||||
promise.then(function (response) {
|
||||
console.log(response); // Success
|
||||
}, function (error) {
|
||||
console.log(error); // Failure
|
||||
});
|
|
@ -0,0 +1,18 @@
|
|||
import { Client, Health } from "@appwrite.io/console";
|
||||
|
||||
const client = new Client();
|
||||
|
||||
const health = new Health(client);
|
||||
|
||||
client
|
||||
.setEndpoint('https://cloud.appwrite.io/v1') // Your API Endpoint
|
||||
.setProject('5df5acd0d48c2') // Your project ID
|
||||
;
|
||||
|
||||
const promise = health.getQueueDeletes();
|
||||
|
||||
promise.then(function (response) {
|
||||
console.log(response); // Success
|
||||
}, function (error) {
|
||||
console.log(error); // Failure
|
||||
});
|
|
@ -0,0 +1,18 @@
|
|||
import { Client, Health } from "@appwrite.io/console";
|
||||
|
||||
const client = new Client();
|
||||
|
||||
const health = new Health(client);
|
||||
|
||||
client
|
||||
.setEndpoint('https://cloud.appwrite.io/v1') // Your API Endpoint
|
||||
.setProject('5df5acd0d48c2') // Your project ID
|
||||
;
|
||||
|
||||
const promise = health.getQueueMails();
|
||||
|
||||
promise.then(function (response) {
|
||||
console.log(response); // Success
|
||||
}, function (error) {
|
||||
console.log(error); // Failure
|
||||
});
|
534
src/Appwrite/Platform/Workers/Certificates.php
Normal file
534
src/Appwrite/Platform/Workers/Certificates.php
Normal file
|
@ -0,0 +1,534 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Event\Func;
|
||||
use Appwrite\Event\Mail;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Network\Validator\CNAME;
|
||||
use Appwrite\Template\Template;
|
||||
use Appwrite\Utopia\Response\Model\Rule;
|
||||
use Exception;
|
||||
use Throwable;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Authorization;
|
||||
use Utopia\Database\Exception\Conflict;
|
||||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Domains\Domain;
|
||||
use Utopia\Locale\Locale;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
class Certificates extends Action
|
||||
{
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'certificates';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Certificates worker')
|
||||
->inject('message')
|
||||
->inject('dbForConsole')
|
||||
->inject('queueForMails')
|
||||
->inject('queueForEvents')
|
||||
->inject('queueForFunctions')
|
||||
->callback(fn(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForConsole
|
||||
* @param Mail $queueForMails
|
||||
* @param Event $queueForEvents
|
||||
* @param Func $queueForFunctions
|
||||
* @return void
|
||||
* @throws Throwable
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$document = new Document($payload['domain'] ?? []);
|
||||
$domain = new Domain($document->getAttribute('domain', ''));
|
||||
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
|
||||
|
||||
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $skipRenewCheck);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Domain $domain
|
||||
* @param Database $dbForConsole
|
||||
* @param Mail $queueForMails
|
||||
* @param Event $queueForEvents
|
||||
* @param Func $queueForFunctions
|
||||
* @param bool $skipRenewCheck
|
||||
* @return void
|
||||
* @throws Throwable
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, bool $skipRenewCheck = false): void
|
||||
{
|
||||
/**
|
||||
* 1. Read arguments and validate domain
|
||||
* 2. Get main domain
|
||||
* 3. Validate CNAME DNS if parameter is not main domain (meaning it's custom domain)
|
||||
* 4. Validate security email. Cannot be empty, required by LetsEncrypt
|
||||
* 5. Validate renew date with certificate file, unless requested to skip by parameter
|
||||
* 6. Issue a certificate using certbot CLI
|
||||
* 7. Update 'log' attribute on certificate document with Certbot message
|
||||
* 8. Create storage folder for certificate, if not ready already
|
||||
* 9. Move certificates from Certbot location to our Storage
|
||||
* 10. Create/Update our Storage with new Traefik config with new certificate paths
|
||||
* 11. Read certificate file and update 'renewDate' on certificate document
|
||||
* 12. Update 'issueDate' and 'attempts' on certificate
|
||||
*
|
||||
* If at any point unexpected error occurs, program stops without applying changes to document, and error is thrown into worker
|
||||
*
|
||||
* If code stops with expected error:
|
||||
* 1. 'log' attribute on document is updated with error message
|
||||
* 2. 'attempts' amount is increased
|
||||
* 3. Console log is shown
|
||||
* 4. Email is sent to security email
|
||||
*
|
||||
* Unless unexpected error occurs, at the end, we:
|
||||
* 1. Update 'updated' attribute on document
|
||||
* 2. Save document to database
|
||||
* 3. Update all domains documents with current certificate ID
|
||||
*
|
||||
* Note: Renewals are checked and scheduled from maintenence worker
|
||||
*/
|
||||
|
||||
// Get current certificate
|
||||
$certificate = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain->get()])]);
|
||||
|
||||
// If we don't have certificate for domain yet, let's create new document. At the end we save it
|
||||
if (!$certificate) {
|
||||
$certificate = new Document();
|
||||
$certificate->setAttribute('domain', $domain->get());
|
||||
}
|
||||
|
||||
$success = false;
|
||||
|
||||
try {
|
||||
// Email for alerts is required by LetsEncrypt
|
||||
$email = App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS');
|
||||
if (empty($email)) {
|
||||
throw new Exception('You must set a valid security email address (_APP_SYSTEM_SECURITY_EMAIL_ADDRESS) to issue an SSL certificate.');
|
||||
}
|
||||
|
||||
// Validate domain and DNS records. Skip if job is forced
|
||||
if (!$skipRenewCheck) {
|
||||
$mainDomain = $this->getMainDomain();
|
||||
$isMainDomain = !isset($mainDomain) || $domain->get() === $mainDomain;
|
||||
$this->validateDomain($domain, $isMainDomain);
|
||||
}
|
||||
|
||||
// If certificate exists already, double-check expiry date. Skip if job is forced
|
||||
if (!$skipRenewCheck && !$this->isRenewRequired($domain->get())) {
|
||||
throw new Exception('Renew isn\'t required.');
|
||||
}
|
||||
|
||||
// Prepare folder name for certbot. Using this helps prevent miss-match in LetsEncrypt configuration when renewing certificate
|
||||
$folder = ID::unique();
|
||||
|
||||
// Generate certificate files using Let's Encrypt
|
||||
$letsEncryptData = $this->issueCertificate($folder, $domain->get(), $email);
|
||||
|
||||
// Command succeeded, store all data into document
|
||||
$logs = 'Certificate successfully generated.';
|
||||
$certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB
|
||||
|
||||
|
||||
// Give certificates to Traefik
|
||||
$this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData);
|
||||
|
||||
// Update certificate info stored in database
|
||||
$certificate->setAttribute('renewDate', $this->getRenewDate($domain->get()));
|
||||
$certificate->setAttribute('attempts', 0);
|
||||
$certificate->setAttribute('issueDate', DateTime::now());
|
||||
$success = true;
|
||||
} catch (Throwable $e) {
|
||||
$logs = $e->getMessage();
|
||||
|
||||
// Set exception as log in certificate document
|
||||
$certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB
|
||||
|
||||
// Increase attempts count
|
||||
$attempts = $certificate->getAttribute('attempts', 0) + 1;
|
||||
$certificate->setAttribute('attempts', $attempts);
|
||||
|
||||
// Store cuttent time as renew date to ensure another attempt in next maintenance cycle
|
||||
$certificate->setAttribute('renewDate', DateTime::now());
|
||||
|
||||
// Send email to security email
|
||||
$this->notifyError($domain->get(), $e->getMessage(), $attempts, $queueForMails);
|
||||
} finally {
|
||||
// All actions result in new updatedAt date
|
||||
$certificate->setAttribute('updated', DateTime::now());
|
||||
|
||||
// Save all changes we made to certificate document into database
|
||||
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save certificate data into database.
|
||||
*
|
||||
* @param string $domain Domain name that certificate is for
|
||||
* @param Document $certificate Certificate document that we need to save
|
||||
* @param bool $success
|
||||
* @param Database $dbForConsole Database connection for console
|
||||
* @param Event $queueForEvents
|
||||
* @param Func $queueForFunctions
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws Structure
|
||||
*/
|
||||
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
|
||||
{
|
||||
// Check if update or insert required
|
||||
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
|
||||
if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) {
|
||||
// Merge new data with current data
|
||||
$certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy()));
|
||||
$certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate);
|
||||
} else {
|
||||
$certificate->removeAttribute('$internalId');
|
||||
$certificate = $dbForConsole->createDocument('certificates', $certificate);
|
||||
}
|
||||
|
||||
$certificateId = $certificate->getId();
|
||||
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get main domain. Needed as we do different checks for main and non-main domains.
|
||||
*
|
||||
* @return null|string Returns main domain. If null, there is no main domain yet.
|
||||
*/
|
||||
private function getMainDomain(): ?string
|
||||
{
|
||||
$envDomain = App::getEnv('_APP_DOMAIN', '');
|
||||
if (!empty($envDomain) && $envDomain !== 'localhost') {
|
||||
return $envDomain;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal domain validation functionality to prevent unnecessary attempts failed from Let's Encrypt side. We check:
|
||||
* - Domain needs to be public and valid (prevents NFT domains that are not supported by Let's Encrypt)
|
||||
* - Domain must have proper DNS record
|
||||
*
|
||||
* @param Domain $domain Domain which we validate
|
||||
* @param bool $isMainDomain In case of master domain, we look for different DNS configurations
|
||||
*
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
private function validateDomain(Domain $domain, bool $isMainDomain): void
|
||||
{
|
||||
if (empty($domain->get())) {
|
||||
throw new Exception('Missing certificate domain.');
|
||||
}
|
||||
|
||||
if (!$domain->isKnown() || $domain->isTest()) {
|
||||
throw new Exception('Unknown public suffix for domain.');
|
||||
}
|
||||
|
||||
if (!$isMainDomain) {
|
||||
// TODO: Would be awesome to also support A/AAAA records here. Maybe dry run?
|
||||
// Validate if domain target is properly configured
|
||||
$target = new Domain(App::getEnv('_APP_DOMAIN_TARGET', ''));
|
||||
|
||||
if (!$target->isKnown() || $target->isTest()) {
|
||||
throw new Exception('Unreachable CNAME target (' . $target->get() . '), please use a domain with a public suffix.');
|
||||
}
|
||||
|
||||
// Verify domain with DNS records
|
||||
$validator = new CNAME($target->get());
|
||||
if (!$validator->isValid($domain->get())) {
|
||||
throw new Exception('Failed to verify domain DNS records.');
|
||||
}
|
||||
} else {
|
||||
// Main domain validation
|
||||
// TODO: Would be awesome to check A/AAAA record here. Maybe dry run?
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads expiry date of certificate from file and decides if renewal is required or not.
|
||||
*
|
||||
* @param string $domain Domain for which we check certificate file
|
||||
* @return bool True, if certificate needs to be renewed
|
||||
* @throws Exception
|
||||
*/
|
||||
private function isRenewRequired(string $domain): bool
|
||||
{
|
||||
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
|
||||
if (\file_exists($certPath)) {
|
||||
$validTo = null;
|
||||
|
||||
$certData = openssl_x509_parse(file_get_contents($certPath));
|
||||
$validTo = $certData['validTo_time_t'] ?? 0;
|
||||
|
||||
if (empty($validTo)) {
|
||||
throw new Exception('Unable to read certificate file (cert.pem).');
|
||||
}
|
||||
|
||||
// LetsEncrypt allows renewal 30 days before expiry
|
||||
$expiryInAdvance = (60 * 60 * 24 * 30);
|
||||
if ($validTo - $expiryInAdvance > \time()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* LetsEncrypt communication to issue certificate (using certbot CLI)
|
||||
*
|
||||
* @param string $folder Folder into which certificates should be generated
|
||||
* @param string $domain Domain to generate certificate for
|
||||
* @return array Named array with keys 'stdout' and 'stderr', both string
|
||||
* @throws Exception
|
||||
*/
|
||||
private function issueCertificate(string $folder, string $domain, string $email): array
|
||||
{
|
||||
$stdout = '';
|
||||
$stderr = '';
|
||||
|
||||
$staging = (App::isProduction()) ? '' : ' --dry-run';
|
||||
$exit = Console::execute("certbot certonly -v --webroot --noninteractive --agree-tos{$staging}"
|
||||
. " --email " . $email
|
||||
. " --cert-name " . $folder
|
||||
. " -w " . APP_STORAGE_CERTIFICATES
|
||||
. " -d {$domain}", '', $stdout, $stderr);
|
||||
|
||||
// Unexpected error, usually 5XX, API limits, ...
|
||||
if ($exit !== 0) {
|
||||
throw new Exception('Failed to issue a certificate with message: ' . $stderr);
|
||||
}
|
||||
|
||||
return [
|
||||
'stdout' => $stdout,
|
||||
'stderr' => $stderr
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Read new renew date from certificate file generated by Let's Encrypt
|
||||
*
|
||||
* @param string $domain Domain which certificate was generated for
|
||||
* @return string
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function getRenewDate(string $domain): string
|
||||
{
|
||||
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
|
||||
$certData = openssl_x509_parse(file_get_contents($certPath));
|
||||
$validTo = $certData['validTo_time_t'] ?? null;
|
||||
$dt = (new \DateTime())->setTimestamp($validTo);
|
||||
return DateTime::addSeconds($dt, -60 * 60 * 24 * 30); // -30 days
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to take files from Let's Encrypt, and put it into Traefik.
|
||||
*
|
||||
* @param string $domain Domain which certificate was generated for
|
||||
* @param string $folder Folder in which certificates were generated
|
||||
* @param array $letsEncryptData Let's Encrypt logs to use for additional info when throwing error
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void
|
||||
{
|
||||
|
||||
// Prepare folder in storage for domain
|
||||
$path = APP_STORAGE_CERTIFICATES . '/' . $domain;
|
||||
if (!\is_readable($path)) {
|
||||
if (!\mkdir($path, 0755, true)) {
|
||||
throw new Exception('Failed to create path for certificate.');
|
||||
}
|
||||
}
|
||||
|
||||
// Move generated files
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/cert.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem')) {
|
||||
throw new Exception('Failed to rename certificate cert.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||
}
|
||||
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/chain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/chain.pem')) {
|
||||
throw new Exception('Failed to rename certificate chain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||
}
|
||||
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/fullchain.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/fullchain.pem')) {
|
||||
throw new Exception('Failed to rename certificate fullchain.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||
}
|
||||
|
||||
if (!@\rename('/etc/letsencrypt/live/' . $folder . '/privkey.pem', APP_STORAGE_CERTIFICATES . '/' . $domain . '/privkey.pem')) {
|
||||
throw new Exception('Failed to rename certificate privkey.pem. Let\'s Encrypt log: ' . $letsEncryptData['stderr'] . ' ; ' . $letsEncryptData['stdout']);
|
||||
}
|
||||
|
||||
$config = \implode(PHP_EOL, [
|
||||
"tls:",
|
||||
" certificates:",
|
||||
" - certFile: /storage/certificates/{$domain}/fullchain.pem",
|
||||
" keyFile: /storage/certificates/{$domain}/privkey.pem"
|
||||
]);
|
||||
|
||||
// Save configuration into Traefik using our new cert files
|
||||
if (!\file_put_contents(APP_STORAGE_CONFIG . '/' . $domain . '.yml', $config)) {
|
||||
throw new Exception('Failed to save Traefik configuration.');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to make sure information about error is delivered to admnistrator.
|
||||
*
|
||||
* @param string $domain Domain that caused the error
|
||||
* @param string $errorMessage Verbose error message
|
||||
* @param int $attempt How many times it failed already
|
||||
* @param Mail $queueForMails
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void
|
||||
{
|
||||
// Log error into console
|
||||
Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage);
|
||||
|
||||
// Send mail to administratore mail
|
||||
|
||||
$locale = new Locale(App::getEnv('_APP_LOCALE', 'en'));
|
||||
if (!$locale->getText('emails.sender') || !$locale->getText("emails.certificate.hello") || !$locale->getText("emails.certificate.subject") || !$locale->getText("emails.certificate.body") || !$locale->getText("emails.certificate.footer") || !$locale->getText("emails.certificate.thanks") || !$locale->getText("emails.certificate.signature")) {
|
||||
$locale->setDefault('en');
|
||||
}
|
||||
|
||||
$body = Template::fromFile(__DIR__ . '/../../../../app/config/locale/templates/email-base.tpl');
|
||||
|
||||
$subject = \sprintf($locale->getText("emails.certificate.subject"), $domain);
|
||||
$body
|
||||
->setParam('{{domain}}', $domain)
|
||||
->setParam('{{error}}', $errorMessage)
|
||||
->setParam('{{attempt}}', $attempt)
|
||||
->setParam('{{subject}}', $subject)
|
||||
->setParam('{{hello}}', $locale->getText("emails.certificate.hello"))
|
||||
->setParam('{{body}}', $locale->getText("emails.certificate.body"))
|
||||
->setParam('{{redirect}}', 'https://' . $domain)
|
||||
->setParam('{{footer}}', $locale->getText("emails.certificate.footer"))
|
||||
->setParam('{{thanks}}', $locale->getText("emails.certificate.thanks"))
|
||||
->setParam('{{signature}}', $locale->getText("emails.certificate.signature"))
|
||||
->setParam('{{project}}', 'Console')
|
||||
->setParam('{{direction}}', $locale->getText('settings.direction'))
|
||||
->setParam('{{bg-body}}', '#f7f7f7')
|
||||
->setParam('{{bg-content}}', '#ffffff')
|
||||
->setParam('{{text-content}}', '#000000');
|
||||
|
||||
$queueForMails
|
||||
->setRecipient(App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS'))
|
||||
->setBody($body->render())
|
||||
->setName('Appwrite Administrator')
|
||||
->trigger();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update all existing domain documents so they have relation to correct certificate document.
|
||||
* This solved issues:
|
||||
* - when adding a domain for which there is already a certificate
|
||||
* - when renew creates new document? It might?
|
||||
* - overall makes it more reliable
|
||||
*
|
||||
* @param string $certificateId ID of a new or updated certificate document
|
||||
* @param string $domain Domain that is affected by new certificate
|
||||
* @param bool $success Was certificate generation successful?
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
|
||||
{
|
||||
|
||||
$rule = $dbForConsole->findOne('rules', [
|
||||
Query::equal('domain', [$domain]),
|
||||
]);
|
||||
|
||||
if ($rule !== false && !$rule->isEmpty()) {
|
||||
$rule->setAttribute('certificateId', $certificateId);
|
||||
$rule->setAttribute('status', $success ? 'verified' : 'unverified');
|
||||
$dbForConsole->updateDocument('rules', $rule->getId(), $rule);
|
||||
|
||||
$projectId = $rule->getAttribute('projectId');
|
||||
|
||||
// Skip events for console project (triggered by auto-ssl generation for 1 click setups)
|
||||
if ($projectId === 'console') {
|
||||
return;
|
||||
}
|
||||
|
||||
$project = $dbForConsole->getDocument('projects', $projectId);
|
||||
|
||||
/** Trigger Webhook */
|
||||
$ruleModel = new Rule();
|
||||
$queueForEvents
|
||||
->setProject($project)
|
||||
->setEvent('rules.[ruleId].update')
|
||||
->setParam('ruleId', $rule->getId())
|
||||
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
|
||||
->trigger();
|
||||
|
||||
|
||||
/** Trigger Functions */
|
||||
$queueForFunctions
|
||||
->setProject($project)
|
||||
->setEvent('rules.[ruleId].update')
|
||||
->setParam('ruleId', $rule->getId())
|
||||
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
|
||||
->trigger();
|
||||
|
||||
/** Trigger realtime event */
|
||||
$allEvents = Event::generateEvents('rules.[ruleId].update', [
|
||||
'ruleId' => $rule->getId(),
|
||||
]);
|
||||
$target = Realtime::fromPayload(
|
||||
// Pass first, most verbose event pattern
|
||||
event: $allEvents[0],
|
||||
payload: $rule,
|
||||
project: $project
|
||||
);
|
||||
Realtime::send(
|
||||
projectId: 'console',
|
||||
payload: $rule->getArrayCopy(),
|
||||
events: $allEvents,
|
||||
channels: $target['channels'],
|
||||
roles: $target['roles']
|
||||
);
|
||||
Realtime::send(
|
||||
projectId: $project->getId(),
|
||||
payload: $rule->getArrayCopy(),
|
||||
events: $allEvents,
|
||||
channels: $target['channels'],
|
||||
roles: $target['roles']
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
622
src/Appwrite/Platform/Workers/Databases.php
Normal file
622
src/Appwrite/Platform/Workers/Databases.php
Normal file
|
@ -0,0 +1,622 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Utopia\Response\Model\Platform;
|
||||
use Exception;
|
||||
use Utopia\Audit\Audit;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Authorization;
|
||||
use Utopia\Database\Exception\Conflict;
|
||||
use Utopia\Database\Exception\Restricted;
|
||||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Database\Exception as DatabaseException;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
class Databases extends Action
|
||||
{
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'databases';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Databases worker')
|
||||
->inject('message')
|
||||
->inject('dbForConsole')
|
||||
->inject('dbForProject')
|
||||
->callback(fn($message, $dbForConsole, $dbForProject) => $this->action($message, $dbForConsole, $dbForProject));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForConsole, Database $dbForProject): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new \Exception('Missing payload');
|
||||
}
|
||||
|
||||
$type = $payload['type'];
|
||||
$project = new Document($payload['project']);
|
||||
$collection = new Document($payload['collection'] ?? []);
|
||||
$document = new Document($payload['document'] ?? []);
|
||||
$database = new Document($payload['database'] ?? []);
|
||||
|
||||
if ($database->isEmpty()) {
|
||||
throw new Exception('Missing database');
|
||||
}
|
||||
|
||||
match (strval($type)) {
|
||||
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
|
||||
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
|
||||
default => Console::error('No database operation for type: ' . $type),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $attribute
|
||||
* @param Document $project
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws \Exception
|
||||
*/
|
||||
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void
|
||||
{
|
||||
if ($collection->isEmpty()) {
|
||||
throw new Exception('Missing collection');
|
||||
}
|
||||
if ($attribute->isEmpty()) {
|
||||
throw new Exception('Missing attribute');
|
||||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
|
||||
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [
|
||||
'databaseId' => $database->getId(),
|
||||
'collectionId' => $collection->getId(),
|
||||
'attributeId' => $attribute->getId()
|
||||
]);
|
||||
/**
|
||||
* TODO @christyjacob4 verify if this is still the case
|
||||
* Fetch attribute from the database, since with Resque float values are loosing informations.
|
||||
*/
|
||||
$attribute = $dbForProject->getDocument('attributes', $attribute->getId());
|
||||
|
||||
$collectionId = $collection->getId();
|
||||
$key = $attribute->getAttribute('key', '');
|
||||
$type = $attribute->getAttribute('type', '');
|
||||
$size = $attribute->getAttribute('size', 0);
|
||||
$required = $attribute->getAttribute('required', false);
|
||||
$default = $attribute->getAttribute('default', null);
|
||||
$signed = $attribute->getAttribute('signed', true);
|
||||
$array = $attribute->getAttribute('array', false);
|
||||
$format = $attribute->getAttribute('format', '');
|
||||
$formatOptions = $attribute->getAttribute('formatOptions', []);
|
||||
$filters = $attribute->getAttribute('filters', []);
|
||||
$options = $attribute->getAttribute('options', []);
|
||||
$project = $dbForConsole->getDocument('projects', $projectId);
|
||||
|
||||
|
||||
try {
|
||||
switch ($type) {
|
||||
case Database::VAR_RELATIONSHIP:
|
||||
$relatedCollection = $dbForProject->getDocument('database_' . $database->getInternalId(), $options['relatedCollection']);
|
||||
if ($relatedCollection->isEmpty()) {
|
||||
throw new DatabaseException('Collection not found');
|
||||
}
|
||||
|
||||
if (
|
||||
!$dbForProject->createRelationship(
|
||||
collection: 'database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(),
|
||||
relatedCollection: 'database_' . $database->getInternalId() . '_collection_' . $relatedCollection->getInternalId(),
|
||||
type: $options['relationType'],
|
||||
twoWay: $options['twoWay'],
|
||||
id: $key,
|
||||
twoWayKey: $options['twoWayKey'],
|
||||
onDelete: $options['onDelete'],
|
||||
)
|
||||
) {
|
||||
throw new DatabaseException('Failed to create Attribute');
|
||||
}
|
||||
|
||||
if ($options['twoWay']) {
|
||||
$relatedAttribute = $dbForProject->getDocument('attributes', $database->getInternalId() . '_' . $relatedCollection->getInternalId() . '_' . $options['twoWayKey']);
|
||||
$dbForProject->updateDocument('attributes', $relatedAttribute->getId(), $relatedAttribute->setAttribute('status', 'available'));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
|
||||
throw new \Exception('Failed to create Attribute');
|
||||
}
|
||||
}
|
||||
|
||||
$dbForProject->updateDocument('attributes', $attribute->getId(), $attribute->setAttribute('status', 'available'));
|
||||
} catch (\Exception $e) {
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
$attribute->setAttribute('error', $e->getMessage());
|
||||
if (isset($relatedAttribute)) {
|
||||
$relatedAttribute->setAttribute('error', $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
$dbForProject->updateDocument(
|
||||
'attributes',
|
||||
$attribute->getId(),
|
||||
$attribute->setAttribute('status', 'failed')
|
||||
);
|
||||
|
||||
if (isset($relatedAttribute)) {
|
||||
$dbForProject->updateDocument(
|
||||
'attributes',
|
||||
$relatedAttribute->getId(),
|
||||
$relatedAttribute->setAttribute('status', 'failed')
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
|
||||
}
|
||||
|
||||
if ($type === Database::VAR_RELATIONSHIP && $options['twoWay']) {
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
|
||||
}
|
||||
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $attribute
|
||||
* @param Document $project
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws \Exception
|
||||
**/
|
||||
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void
|
||||
{
|
||||
if ($collection->isEmpty()) {
|
||||
throw new Exception('Missing collection');
|
||||
}
|
||||
if ($attribute->isEmpty()) {
|
||||
throw new Exception('Missing attribute');
|
||||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
|
||||
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [
|
||||
'databaseId' => $database->getId(),
|
||||
'collectionId' => $collection->getId(),
|
||||
'attributeId' => $attribute->getId()
|
||||
]);
|
||||
$collectionId = $collection->getId();
|
||||
$key = $attribute->getAttribute('key', '');
|
||||
$status = $attribute->getAttribute('status', '');
|
||||
$type = $attribute->getAttribute('type', '');
|
||||
$project = $dbForConsole->getDocument('projects', $projectId);
|
||||
$options = $attribute->getAttribute('options', []);
|
||||
$relatedAttribute = new Document();
|
||||
$relatedCollection = new Document();
|
||||
// possible states at this point:
|
||||
// - available: should not land in queue; controller flips these to 'deleting'
|
||||
// - processing: hasn't finished creating
|
||||
// - deleting: was available, in deletion queue for first time
|
||||
// - failed: attribute was never created
|
||||
// - stuck: attribute was available but cannot be removed
|
||||
|
||||
try {
|
||||
if ($status !== 'failed') {
|
||||
if ($type === Database::VAR_RELATIONSHIP) {
|
||||
if ($options['twoWay']) {
|
||||
$relatedCollection = $dbForProject->getDocument('database_' . $database->getInternalId(), $options['relatedCollection']);
|
||||
if ($relatedCollection->isEmpty()) {
|
||||
throw new DatabaseException('Collection not found');
|
||||
}
|
||||
$relatedAttribute = $dbForProject->getDocument('attributes', $database->getInternalId() . '_' . $relatedCollection->getInternalId() . '_' . $options['twoWayKey']);
|
||||
}
|
||||
|
||||
if (!$dbForProject->deleteRelationship('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
|
||||
$dbForProject->updateDocument('attributes', $relatedAttribute->getId(), $relatedAttribute->setAttribute('status', 'stuck'));
|
||||
throw new DatabaseException('Failed to delete Relationship');
|
||||
}
|
||||
} elseif (!$dbForProject->deleteAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
|
||||
throw new DatabaseException('Failed to delete Attribute');
|
||||
}
|
||||
}
|
||||
|
||||
$dbForProject->deleteDocument('attributes', $attribute->getId());
|
||||
|
||||
if (!$relatedAttribute->isEmpty()) {
|
||||
$dbForProject->deleteDocument('attributes', $relatedAttribute->getId());
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
$attribute->setAttribute('error', $e->getMessage());
|
||||
if (!$relatedAttribute->isEmpty()) {
|
||||
$relatedAttribute->setAttribute('error', $e->getMessage());
|
||||
}
|
||||
}
|
||||
$dbForProject->updateDocument(
|
||||
'attributes',
|
||||
$attribute->getId(),
|
||||
$attribute->setAttribute('status', 'stuck')
|
||||
);
|
||||
if (!$relatedAttribute->isEmpty()) {
|
||||
$dbForProject->updateDocument(
|
||||
'attributes',
|
||||
$relatedAttribute->getId(),
|
||||
$relatedAttribute->setAttribute('status', 'stuck')
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
|
||||
}
|
||||
|
||||
// The underlying database removes/rebuilds indexes when attribute is removed
|
||||
// Update indexes table with changes
|
||||
/** @var Document[] $indexes */
|
||||
$indexes = $collection->getAttribute('indexes', []);
|
||||
|
||||
foreach ($indexes as $index) {
|
||||
/** @var string[] $attributes */
|
||||
$attributes = $index->getAttribute('attributes');
|
||||
$lengths = $index->getAttribute('lengths');
|
||||
$orders = $index->getAttribute('orders');
|
||||
|
||||
$found = \array_search($key, $attributes);
|
||||
|
||||
if ($found !== false) {
|
||||
// If found, remove entry from attributes, lengths, and orders
|
||||
// array_values wraps array_diff to reindex array keys
|
||||
// when found attribute is removed from array
|
||||
$attributes = \array_values(\array_diff($attributes, [$attributes[$found]]));
|
||||
$lengths = \array_values(\array_diff($lengths, isset($lengths[$found]) ? [$lengths[$found]] : []));
|
||||
$orders = \array_values(\array_diff($orders, isset($orders[$found]) ? [$orders[$found]] : []));
|
||||
|
||||
if (empty($attributes)) {
|
||||
$dbForProject->deleteDocument('indexes', $index->getId());
|
||||
} else {
|
||||
$index
|
||||
->setAttribute('attributes', $attributes, Document::SET_TYPE_ASSIGN)
|
||||
->setAttribute('lengths', $lengths, Document::SET_TYPE_ASSIGN)
|
||||
->setAttribute('orders', $orders, Document::SET_TYPE_ASSIGN);
|
||||
|
||||
// Check if an index exists with the same attributes and orders
|
||||
$exists = false;
|
||||
foreach ($indexes as $existing) {
|
||||
if (
|
||||
$existing->getAttribute('key') !== $index->getAttribute('key') // Ignore itself
|
||||
&& $existing->getAttribute('attributes') === $index->getAttribute('attributes')
|
||||
&& $existing->getAttribute('orders') === $index->getAttribute('orders')
|
||||
) {
|
||||
$exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ($exists) { // Delete the duplicate if created, else update in db
|
||||
$this->deleteIndex($database, $collection, $index, $project, $dbForConsole, $dbForProject);
|
||||
} else {
|
||||
$dbForProject->updateDocument('indexes', $index->getId(), $index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||
$dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
|
||||
|
||||
if (!$relatedCollection->isEmpty() && !$relatedAttribute->isEmpty()) {
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
|
||||
$dbForProject->deleteCachedCollection('database_' . $database->getInternalId() . '_collection_' . $relatedCollection->getInternalId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $index
|
||||
* @param Document $project
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws Structure
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
|
||||
{
|
||||
if ($collection->isEmpty()) {
|
||||
throw new Exception('Missing collection');
|
||||
}
|
||||
if ($index->isEmpty()) {
|
||||
throw new Exception('Missing index');
|
||||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
|
||||
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [
|
||||
'databaseId' => $database->getId(),
|
||||
'collectionId' => $collection->getId(),
|
||||
'indexId' => $index->getId()
|
||||
]);
|
||||
$collectionId = $collection->getId();
|
||||
$key = $index->getAttribute('key', '');
|
||||
$type = $index->getAttribute('type', '');
|
||||
$attributes = $index->getAttribute('attributes', []);
|
||||
$lengths = $index->getAttribute('lengths', []);
|
||||
$orders = $index->getAttribute('orders', []);
|
||||
$project = $dbForConsole->getDocument('projects', $projectId);
|
||||
|
||||
try {
|
||||
if (!$dbForProject->createIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $attributes, $lengths, $orders)) {
|
||||
throw new DatabaseException('Failed to create Index');
|
||||
}
|
||||
$dbForProject->updateDocument('indexes', $index->getId(), $index->setAttribute('status', 'available'));
|
||||
} catch (\Exception $e) {
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
$index->setAttribute('error', $e->getMessage());
|
||||
}
|
||||
$dbForProject->updateDocument(
|
||||
'indexes',
|
||||
$index->getId(),
|
||||
$index->setAttribute('status', 'failed')
|
||||
);
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $index, $project, $projectId, $events);
|
||||
}
|
||||
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $index
|
||||
* @param Document $project
|
||||
* @param Database $dbForConsole
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws Structure
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
|
||||
{
|
||||
if ($collection->isEmpty()) {
|
||||
throw new Exception('Missing collection');
|
||||
}
|
||||
if ($index->isEmpty()) {
|
||||
throw new Exception('Missing index');
|
||||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
|
||||
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [
|
||||
'databaseId' => $database->getId(),
|
||||
'collectionId' => $collection->getId(),
|
||||
'indexId' => $index->getId()
|
||||
]);
|
||||
$key = $index->getAttribute('key');
|
||||
$status = $index->getAttribute('status', '');
|
||||
$project = $dbForConsole->getDocument('projects', $projectId);
|
||||
|
||||
try {
|
||||
if ($status !== 'failed' && !$dbForProject->deleteIndex('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key)) {
|
||||
throw new DatabaseException('Failed to delete index');
|
||||
}
|
||||
$dbForProject->deleteDocument('indexes', $index->getId());
|
||||
$index->setAttribute('status', 'deleted');
|
||||
} catch (\Exception $e) {
|
||||
Console::error($e->getMessage());
|
||||
|
||||
if ($e instanceof DatabaseException) {
|
||||
$index->setAttribute('error', $e->getMessage());
|
||||
}
|
||||
$dbForProject->updateDocument(
|
||||
'indexes',
|
||||
$index->getId(),
|
||||
$index->setAttribute('status', 'stuck')
|
||||
);
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $index, $project, $projectId, $events);
|
||||
}
|
||||
|
||||
$dbForProject->deleteCachedDocument('database_' . $database->getInternalId(), $collection->getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $project
|
||||
* @param $dbForProject
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function deleteDatabase(Document $database, Document $project, $dbForProject): void
|
||||
{
|
||||
$this->deleteByGroup('database_' . $database->getInternalId(), [], $dbForProject, function ($collection) use ($database, $project, $dbForProject) {
|
||||
$this->deleteCollection($database, $collection, $project, $dbForProject);
|
||||
});
|
||||
|
||||
$dbForProject->deleteCollection('database_' . $database->getInternalId());
|
||||
|
||||
$this->deleteAuditLogsByResource('database/' . $database->getId(), $project, $dbForProject);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $project
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws DatabaseException
|
||||
* @throws Restricted
|
||||
* @throws Structure
|
||||
*/
|
||||
protected function deleteCollection(Document $database, Document $collection, Document $project, Database $dbForProject): void
|
||||
{
|
||||
if ($collection->isEmpty()) {
|
||||
throw new Exception('Missing collection');
|
||||
}
|
||||
|
||||
$collectionId = $collection->getId();
|
||||
$collectionInternalId = $collection->getInternalId();
|
||||
$databaseId = $database->getId();
|
||||
$databaseInternalId = $database->getInternalId();
|
||||
|
||||
$relationships = \array_filter(
|
||||
$collection->getAttribute('attributes'),
|
||||
fn ($attribute) => $attribute['type'] === Database::VAR_RELATIONSHIP
|
||||
);
|
||||
|
||||
foreach ($relationships as $relationship) {
|
||||
if (!$relationship['twoWay']) {
|
||||
continue;
|
||||
}
|
||||
$relatedCollection = $dbForProject->getDocument('database_' . $databaseInternalId, $relationship['relatedCollection']);
|
||||
$dbForProject->deleteDocument('attributes', $databaseInternalId . '_' . $relatedCollection->getInternalId() . '_' . $relationship['twoWayKey']);
|
||||
$dbForProject->deleteCachedDocument('database_' . $databaseInternalId, $relatedCollection->getId());
|
||||
$dbForProject->deleteCachedCollection('database_' . $databaseInternalId . '_collection_' . $relatedCollection->getInternalId());
|
||||
}
|
||||
|
||||
$dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
|
||||
|
||||
$this->deleteByGroup('attributes', [
|
||||
Query::equal('databaseInternalId', [$databaseInternalId]),
|
||||
Query::equal('collectionInternalId', [$collectionInternalId])
|
||||
], $dbForProject);
|
||||
|
||||
$this->deleteByGroup('indexes', [
|
||||
Query::equal('databaseInternalId', [$databaseInternalId]),
|
||||
Query::equal('collectionInternalId', [$collectionInternalId])
|
||||
], $dbForProject);
|
||||
|
||||
$this->deleteAuditLogsByResource('database/' . $databaseId . '/collection/' . $collectionId, $project, $dbForProject);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $resource
|
||||
* @param Document $project
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function deleteAuditLogsByResource(string $resource, Document $project, Database $dbForProject): void
|
||||
{
|
||||
$this->deleteByGroup(Audit::COLLECTION, [
|
||||
Query::equal('resource', [$resource])
|
||||
], $dbForProject);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $collection collectionID
|
||||
* @param array $queries
|
||||
* @param Database $database
|
||||
* @param callable|null $callback
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
|
||||
{
|
||||
$count = 0;
|
||||
$chunk = 0;
|
||||
$limit = 50;
|
||||
$sum = $limit;
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
|
||||
while ($sum === $limit) {
|
||||
$chunk++;
|
||||
|
||||
$results = $database->find($collection, \array_merge([Query::limit($limit)], $queries));
|
||||
|
||||
$sum = count($results);
|
||||
|
||||
Console::info('Deleting chunk #' . $chunk . '. Found ' . $sum . ' documents');
|
||||
|
||||
foreach ($results as $document) {
|
||||
if ($database->deleteDocument($document->getCollection(), $document->getId())) {
|
||||
Console::success('Deleted document "' . $document->getId() . '" successfully');
|
||||
|
||||
if (\is_callable($callback)) {
|
||||
$callback($document);
|
||||
}
|
||||
} else {
|
||||
Console::error('Failed to delete document: ' . $document->getId());
|
||||
}
|
||||
$count++;
|
||||
}
|
||||
}
|
||||
|
||||
$executionEnd = \microtime(true);
|
||||
|
||||
Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
|
||||
}
|
||||
|
||||
protected function trigger(
|
||||
Document $database,
|
||||
Document $collection,
|
||||
Document $attribute,
|
||||
Document $project,
|
||||
string $projectId,
|
||||
array $events
|
||||
): void {
|
||||
$target = Realtime::fromPayload(
|
||||
// Pass first, most verbose event pattern
|
||||
event: $events[0],
|
||||
payload: $attribute,
|
||||
project: $project,
|
||||
);
|
||||
Realtime::send(
|
||||
projectId: 'console',
|
||||
payload: $attribute->getArrayCopy(),
|
||||
events: $events,
|
||||
channels: $target['channels'],
|
||||
roles: $target['roles'],
|
||||
options: [
|
||||
'projectId' => $projectId,
|
||||
'databaseId' => $database->getId(),
|
||||
'collectionId' => $collection->getId()
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
126
src/Appwrite/Platform/Workers/Mails.php
Normal file
126
src/Appwrite/Platform/Workers/Mails.php
Normal file
|
@ -0,0 +1,126 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Appwrite\Template\Template;
|
||||
use Exception;
|
||||
use PHPMailer\PHPMailer\PHPMailer;
|
||||
use Swoole\Runtime;
|
||||
use Utopia\App;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Utopia\Registry\Registry;
|
||||
|
||||
class Mails extends Action
|
||||
{
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'mails';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Mails worker')
|
||||
->inject('message')
|
||||
->inject('register')
|
||||
->callback(fn($message, $register) => $this->action($message, $register));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Registry $register
|
||||
* @throws \PHPMailer\PHPMailer\Exception
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Registry $register): void
|
||||
{
|
||||
Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP);
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$smtp = $payload['smtp'];
|
||||
|
||||
if (empty($smtp) && empty(App::getEnv('_APP_SMTP_HOST'))) {
|
||||
Console::info('Skipped mail processing. No SMTP configuration has been set.');
|
||||
return;
|
||||
}
|
||||
|
||||
$recipient = $payload['recipient'];
|
||||
$subject = $payload['subject'];
|
||||
$variables = $payload['variables'];
|
||||
$name = $payload['name'];
|
||||
$body = Template::fromFile(__DIR__ . '/../../../../app/config/locale/templates/email-base.tpl');
|
||||
|
||||
foreach ($variables as $key => $value) {
|
||||
$body->setParam('{{' . $key . '}}', $value);
|
||||
}
|
||||
|
||||
$body = $body->render();
|
||||
|
||||
/** @var PHPMailer $mail */
|
||||
$mail = empty($smtp)
|
||||
? $register->get('smtp')
|
||||
: $this->getMailer($smtp);
|
||||
|
||||
$mail->clearAddresses();
|
||||
$mail->clearAllRecipients();
|
||||
$mail->clearReplyTos();
|
||||
$mail->clearAttachments();
|
||||
$mail->clearBCCs();
|
||||
$mail->clearCCs();
|
||||
$mail->addAddress($recipient, $name);
|
||||
$mail->Subject = $subject;
|
||||
$mail->Body = $body;
|
||||
$mail->AltBody = \strip_tags($body);
|
||||
|
||||
try {
|
||||
$mail->send();
|
||||
} catch (\Exception $error) {
|
||||
throw new Exception('Error sending mail: ' . $error->getMessage(), 500);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $smtp
|
||||
* @return PHPMailer
|
||||
* @throws \PHPMailer\PHPMailer\Exception
|
||||
*/
|
||||
protected function getMailer(array $smtp): PHPMailer
|
||||
{
|
||||
$mail = new PHPMailer(true);
|
||||
|
||||
$mail->isSMTP();
|
||||
|
||||
$username = $smtp['username'];
|
||||
$password = $smtp['password'];
|
||||
|
||||
$mail->XMailer = 'Appwrite Mailer';
|
||||
$mail->Host = $smtp['host'];
|
||||
$mail->Port = $smtp['port'];
|
||||
$mail->SMTPAuth = (!empty($username) && !empty($password));
|
||||
$mail->Username = $username;
|
||||
$mail->Password = $password;
|
||||
$mail->SMTPSecure = $smtp['secure'];
|
||||
$mail->SMTPAutoTLS = false;
|
||||
$mail->CharSet = 'UTF-8';
|
||||
|
||||
$mail->setFrom($smtp['senderEmail'], $smtp['senderName']);
|
||||
|
||||
if (!empty($smtp['replyTo'])) {
|
||||
$mail->addReplyTo($smtp['replyTo'], $smtp['senderName']);
|
||||
}
|
||||
|
||||
$mail->isHTML();
|
||||
|
||||
return $mail;
|
||||
}
|
||||
}
|
330
src/Appwrite/Platform/Workers/Migrations.php
Normal file
330
src/Appwrite/Platform/Workers/Migrations.php
Normal file
|
@ -0,0 +1,330 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Exception;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception\Authorization;
|
||||
use Utopia\Database\Exception\Conflict;
|
||||
use Utopia\Database\Exception\Restricted;
|
||||
use Utopia\Database\Exception\Structure;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Messaging\Adapter\Realtime;
|
||||
use Appwrite\Permission;
|
||||
use Appwrite\Role;
|
||||
use Utopia\CLI\Console;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Database\Helpers\ID;
|
||||
use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite;
|
||||
use Utopia\Migration\Resource;
|
||||
use Utopia\Migration\Source;
|
||||
use Utopia\Migration\Sources\Appwrite;
|
||||
use Utopia\Migration\Sources\Firebase;
|
||||
use Utopia\Migration\Sources\NHost;
|
||||
use Utopia\Migration\Sources\Supabase;
|
||||
use Utopia\Migration\Transfer;
|
||||
|
||||
class Migrations extends Action
|
||||
{
|
||||
private ?Database $dbForProject = null;
|
||||
private ?Database $dbForConsole = null;
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'migrations';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Migrations worker')
|
||||
->inject('message')
|
||||
->inject('dbForProject')
|
||||
->inject('dbForConsole')
|
||||
->callback(fn(Message $message, Database $dbForProject, Database $dbForConsole) => $this->action($message, $dbForProject, $dbForConsole));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @param Database $dbForProject
|
||||
* @param Database $dbForConsole
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message, Database $dbForProject, Database $dbForConsole): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$events = $payload['events'] ?? [];
|
||||
$project = new Document($payload['project'] ?? []);
|
||||
$migration = new Document($payload['migration'] ?? []);
|
||||
|
||||
if ($project->getId() === 'console') {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->dbForProject = $dbForProject;
|
||||
$this->dbForConsole = $dbForConsole;
|
||||
|
||||
/**
|
||||
* Handle Event execution.
|
||||
*/
|
||||
if (! empty($events)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->processMigration($project, $migration);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $source
|
||||
* @param array $credentials
|
||||
* @return Source
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function processSource(string $source, array $credentials): Source
|
||||
{
|
||||
return match ($source) {
|
||||
Firebase::getName() => new Firebase(
|
||||
json_decode($credentials['serviceAccount'], true),
|
||||
),
|
||||
Supabase::getName() => new Supabase(
|
||||
$credentials['endpoint'],
|
||||
$credentials['apiKey'],
|
||||
$credentials['databaseHost'],
|
||||
'postgres',
|
||||
$credentials['username'],
|
||||
$credentials['password'],
|
||||
$credentials['port'],
|
||||
),
|
||||
NHost::getName() => new NHost(
|
||||
$credentials['subdomain'],
|
||||
$credentials['region'],
|
||||
$credentials['adminSecret'],
|
||||
$credentials['database'],
|
||||
$credentials['username'],
|
||||
$credentials['password'],
|
||||
$credentials['port'],
|
||||
),
|
||||
Appwrite::getName() => new Appwrite($credentials['projectId'], str_starts_with($credentials['endpoint'], 'http://localhost/v1') ? 'http://appwrite/v1' : $credentials['endpoint'], $credentials['apiKey']),
|
||||
default => throw new \Exception('Invalid source type'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Authorization
|
||||
* @throws Structure
|
||||
* @throws Conflict
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function updateMigrationDocument(Document $migration, Document $project): Document
|
||||
{
|
||||
/** Trigger Realtime */
|
||||
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
|
||||
'migrationId' => $migration->getId(),
|
||||
]);
|
||||
|
||||
$target = Realtime::fromPayload(
|
||||
event: $allEvents[0],
|
||||
payload: $migration,
|
||||
project: $project
|
||||
);
|
||||
|
||||
Realtime::send(
|
||||
projectId: 'console',
|
||||
payload: $migration->getArrayCopy(),
|
||||
events: $allEvents,
|
||||
channels: $target['channels'],
|
||||
roles: $target['roles'],
|
||||
);
|
||||
|
||||
Realtime::send(
|
||||
projectId: $project->getId(),
|
||||
payload: $migration->getArrayCopy(),
|
||||
events: $allEvents,
|
||||
channels: $target['channels'],
|
||||
roles: $target['roles'],
|
||||
);
|
||||
|
||||
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $apiKey
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws Restricted
|
||||
* @throws Structure
|
||||
*/
|
||||
protected function removeAPIKey(Document $apiKey): void
|
||||
{
|
||||
$this->dbForConsole->deleteDocument('keys', $apiKey->getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $project
|
||||
* @return Document
|
||||
* @throws Authorization
|
||||
* @throws Structure
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function generateAPIKey(Document $project): Document
|
||||
{
|
||||
$generatedSecret = bin2hex(\random_bytes(128));
|
||||
|
||||
$key = new Document([
|
||||
'$id' => ID::unique(),
|
||||
'$permissions' => [
|
||||
Permission::read(Role::any()),
|
||||
Permission::update(Role::any()),
|
||||
Permission::delete(Role::any()),
|
||||
],
|
||||
'projectInternalId' => $project->getInternalId(),
|
||||
'projectId' => $project->getId(),
|
||||
'name' => 'Transfer API Key',
|
||||
'scopes' => [
|
||||
'users.read',
|
||||
'users.write',
|
||||
'teams.read',
|
||||
'teams.write',
|
||||
'databases.read',
|
||||
'databases.write',
|
||||
'collections.read',
|
||||
'collections.write',
|
||||
'documents.read',
|
||||
'documents.write',
|
||||
'buckets.read',
|
||||
'buckets.write',
|
||||
'files.read',
|
||||
'files.write',
|
||||
'functions.read',
|
||||
'functions.write',
|
||||
],
|
||||
'expire' => null,
|
||||
'sdks' => [],
|
||||
'accessedAt' => null,
|
||||
'secret' => $generatedSecret,
|
||||
]);
|
||||
|
||||
$this->dbForConsole->createDocument('keys', $key);
|
||||
$this->dbForConsole->deleteCachedDocument('projects', $project->getId());
|
||||
|
||||
return $key;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Document $project
|
||||
* @param Document $migration
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
* @throws Conflict
|
||||
* @throws Restricted
|
||||
* @throws Structure
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
protected function processMigration(Document $project, Document $migration): void
|
||||
{
|
||||
/**
|
||||
* @var Document $migrationDocument
|
||||
* @var Transfer $transfer
|
||||
*/
|
||||
$migrationDocument = null;
|
||||
$transfer = null;
|
||||
$projectDocument = $this->dbForConsole->getDocument('projects', $project->getId());
|
||||
$tempAPIKey = $this->generateAPIKey($projectDocument);
|
||||
|
||||
try {
|
||||
$migrationDocument = $this->dbForProject->getDocument('migrations', $migration->getId());
|
||||
$migrationDocument->setAttribute('stage', 'processing');
|
||||
$migrationDocument->setAttribute('status', 'processing');
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
|
||||
$source = $this->processSource($migrationDocument->getAttribute('source'), $migrationDocument->getAttribute('credentials'));
|
||||
|
||||
$source->report();
|
||||
|
||||
$destination = new DestinationsAppwrite(
|
||||
$projectDocument->getId(),
|
||||
'http://appwrite/v1',
|
||||
$tempAPIKey['secret'],
|
||||
);
|
||||
|
||||
$transfer = new Transfer(
|
||||
$source,
|
||||
$destination
|
||||
);
|
||||
|
||||
/** Start Transfer */
|
||||
$migrationDocument->setAttribute('stage', 'migrating');
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
$transfer->run($migrationDocument->getAttribute('resources'), function () use ($migrationDocument, $transfer, $projectDocument) {
|
||||
$migrationDocument->setAttribute('resourceData', json_encode($transfer->getCache()));
|
||||
$migrationDocument->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
|
||||
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
});
|
||||
|
||||
$errors = $transfer->getReport(Resource::STATUS_ERROR);
|
||||
|
||||
if (count($errors) > 0) {
|
||||
$migrationDocument->setAttribute('status', 'failed');
|
||||
$migrationDocument->setAttribute('stage', 'finished');
|
||||
|
||||
$errorMessages = [];
|
||||
foreach ($errors as $error) {
|
||||
$errorMessages[] = "Failed to transfer resource '{$error['id']}:{$error['resource']}' with message '{$error['message']}'";
|
||||
}
|
||||
|
||||
$migrationDocument->setAttribute('errors', $errorMessages);
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$migrationDocument->setAttribute('status', 'completed');
|
||||
$migrationDocument->setAttribute('stage', 'finished');
|
||||
} catch (\Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
|
||||
if ($migrationDocument) {
|
||||
Console::error($th->getMessage());
|
||||
Console::error($th->getTraceAsString());
|
||||
$migrationDocument->setAttribute('status', 'failed');
|
||||
$migrationDocument->setAttribute('stage', 'finished');
|
||||
$migrationDocument->setAttribute('errors', [$th->getMessage()]);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ($transfer) {
|
||||
$errors = $transfer->getReport(Resource::STATUS_ERROR);
|
||||
|
||||
if (count($errors) > 0) {
|
||||
$migrationDocument->setAttribute('status', 'failed');
|
||||
$migrationDocument->setAttribute('stage', 'finished');
|
||||
$migrationDocument->setAttribute('errors', $errors);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if ($migrationDocument) {
|
||||
$this->updateMigrationDocument($migrationDocument, $projectDocument);
|
||||
}
|
||||
if ($tempAPIKey) {
|
||||
$this->removeAPIKey($tempAPIKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
118
src/Appwrite/Platform/Workers/Webhooks.php
Normal file
118
src/Appwrite/Platform/Workers/Webhooks.php
Normal file
|
@ -0,0 +1,118 @@
|
|||
<?php
|
||||
|
||||
namespace Appwrite\Platform\Workers;
|
||||
|
||||
use Exception;
|
||||
use Utopia\App;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Queue\Message;
|
||||
|
||||
class Webhooks extends Action
|
||||
{
|
||||
private array $errors = [];
|
||||
|
||||
public static function getName(): string
|
||||
{
|
||||
return 'webhooks';
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public function __construct()
|
||||
{
|
||||
$this
|
||||
->desc('Webhooks worker')
|
||||
->inject('message')
|
||||
->callback(fn($message) => $this->action($message));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param Message $message
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
public function action(Message $message): void
|
||||
{
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$events = $payload['events'];
|
||||
$webhookPayload = json_encode($payload['payload']);
|
||||
$project = new Document($payload['project']);
|
||||
$user = new Document($payload['user'] ?? []);
|
||||
|
||||
foreach ($project->getAttribute('webhooks', []) as $webhook) {
|
||||
if (array_intersect($webhook->getAttribute('events', []), $events)) {
|
||||
$this->execute($events, $webhookPayload, $webhook, $user, $project);
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($this->errors)) {
|
||||
throw new Exception(\implode(" / \n\n", $this->errors));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $events
|
||||
* @param string $payload
|
||||
* @param Document $webhook
|
||||
* @param Document $user
|
||||
* @param Document $project
|
||||
* @return void
|
||||
*/
|
||||
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project): void
|
||||
{
|
||||
|
||||
$url = \rawurldecode($webhook->getAttribute('url'));
|
||||
$signatureKey = $webhook->getAttribute('signatureKey');
|
||||
$signature = base64_encode(hash_hmac('sha1', $url . $payload, $signatureKey, true));
|
||||
$httpUser = $webhook->getAttribute('httpUser');
|
||||
$httpPass = $webhook->getAttribute('httpPass');
|
||||
$ch = \curl_init($webhook->getAttribute('url'));
|
||||
|
||||
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
|
||||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
|
||||
\curl_setopt($ch, CURLOPT_HEADER, 0);
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(
|
||||
APP_USERAGENT,
|
||||
App::getEnv('_APP_VERSION', 'UNKNOWN'),
|
||||
App::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS', APP_EMAIL_SECURITY)
|
||||
));
|
||||
\curl_setopt(
|
||||
$ch,
|
||||
CURLOPT_HTTPHEADER,
|
||||
[
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: ' . \strlen($payload),
|
||||
'X-' . APP_NAME . '-Webhook-Id: ' . $webhook->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Events: ' . implode(',', $events),
|
||||
'X-' . APP_NAME . '-Webhook-Name: ' . $webhook->getAttribute('name', ''),
|
||||
'X-' . APP_NAME . '-Webhook-User-Id: ' . $user->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Project-Id: ' . $project->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Signature: ' . $signature,
|
||||
]
|
||||
);
|
||||
|
||||
if (!$webhook->getAttribute('security', true)) {
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
||||
}
|
||||
|
||||
if (!empty($httpUser) && !empty($httpPass)) {
|
||||
\curl_setopt($ch, CURLOPT_USERPWD, "$httpUser:$httpPass");
|
||||
\curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
|
||||
}
|
||||
|
||||
if (false === \curl_exec($ch)) {
|
||||
$this->errors[] = \curl_error($ch) . ' in events ' . implode(', ', $events) . ' for webhook ' . $webhook->getAttribute('name');
|
||||
}
|
||||
|
||||
\curl_close($ch);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue