1
0
Fork 0
mirror of synced 2024-06-03 19:34:54 +12:00
appwrite/src/Appwrite/Resque/Worker.php
2022-05-04 11:20:16 +00:00

292 lines
9 KiB
PHP

<?php
namespace Appwrite\Resque;
use Utopia\App;
use Utopia\Cache\Cache;
use Utopia\Cache\Adapter\Redis as RedisCache;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Storage\Device;
use Utopia\Storage\Storage;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Device\DOSpaces;
use Utopia\Storage\Device\BackBlaze;
use Utopia\Storage\Device\S3;
use Exception;
abstract class Worker
{
/**
* Callbacks that will be executed when an error occurs
*
* @var array
*/
static protected array $errorCallbacks = [];
/**
* Associative array holding all information passed into the worker
*
* @return array
*/
public array $args = [];
/**
* Function for identifying the worker needs to be set to unique name
*
* @return string
* @throws Exception
*/
public function getName(): string
{
throw new Exception("Please implement getName method in worker");
}
/**
* Function executed before running first task.
* Can include any preparations, such as connecting to external services or loading files
*
* @return void
* @throws \Exception|\Throwable
*/
public function init() {
throw new Exception("Please implement init method in worker");
}
/**
* Function executed when new task requests is received.
* You can access $args here, it will contain event information
*
* @return void
* @throws \Exception|\Throwable
*/
public function run() {
throw new Exception("Please implement run method in worker");
}
/**
* Function executed just before shutting down the worker.
* You can do cleanup here, such as disconnecting from services or removing temp files
*
* @return void
* @throws \Exception|\Throwable
*/
public function shutdown() {
throw new Exception("Please implement shutdown method in worker");
}
const DATABASE_PROJECT = 'project';
const DATABASE_CONSOLE = 'console';
/**
* A wrapper around 'init' function with non-worker-specific code
*
* @return void
* @throws \Exception|\Throwable
*/
public function setUp(): void
{
try {
$this->init();
} catch(\Throwable $error) {
foreach (self::$errorCallbacks as $errorCallback) {
$errorCallback($error, "init", $this->getName());
}
throw $error;
}
}
/**
* A wrapper around 'run' function with non-worker-specific code
*
* @return void
* @throws \Exception|\Throwable
*/
public function perform(): void
{
try {
$this->run();
} catch(\Throwable $error) {
foreach (self::$errorCallbacks as $errorCallback) {
$errorCallback($error, "run", $this->getName(), $this->args);
}
throw $error;
}
}
/**
* A wrapper around 'shutdown' function with non-worker-specific code
*
* @return void
* @throws \Exception|\Throwable
*/
public function tearDown(): void
{
try {
$this->shutdown();
} catch(\Throwable $error) {
foreach (self::$errorCallbacks as $errorCallback) {
$errorCallback($error, "shutdown", $this->getName());
}
throw $error;
}
}
/**
* Register callback. Will be executed when error occurs.
* @param callable $callback
* @param Throwable $error
* @return self
*/
public static function error(callable $callback): void
{
\array_push(self::$errorCallbacks, $callback);
}
/**
* Get internal project database
* @param string $projectId
* @return Database
*/
protected function getProjectDB(string $projectId): Database
{
return $this->getDB(self::DATABASE_PROJECT, $projectId);
}
/**
* Get console database
* @return Database
*/
protected function getConsoleDB(): Database
{
return $this->getDB(self::DATABASE_CONSOLE);
}
/**
* Get console database
* @param string $type One of (internal, external, console)
* @param string $projectId of internal or external DB
* @return Database
*/
private function getDB($type, $projectId = ''): Database
{
global $register;
$namespace = '';
$sleep = DATABASE_RECONNECT_SLEEP; // overwritten when necessary
switch ($type) {
case self::DATABASE_PROJECT:
if (!$projectId) {
throw new \Exception('ProjectID not provided - cannot get database');
}
$namespace = "_{$projectId}";
break;
case self::DATABASE_CONSOLE:
$namespace = "_console";
$sleep = 5; // ConsoleDB needs extra sleep time to ensure tables are created
break;
default:
throw new \Exception('Unknown database type: ' . $type);
break;
}
$attempts = 0;
do {
try {
$attempts++;
$cache = new Cache(new RedisCache($register->get('cache')));
$database = new Database(new MariaDB($register->get('db')), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace($namespace); // Main DB
if (!empty($projectId) && !$database->getDocument('projects', $projectId)->isEmpty()) {
throw new \Exception("Project does not exist: {$projectId}");
}
if ($type === self::DATABASE_CONSOLE && !$database->exists($database->getDefaultDatabase(), 'realtime')) {
throw new \Exception('Console project not ready');
}
break; // leave loop if successful
} catch(\Exception $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= DATABASE_RECONNECT_MAX_ATTEMPTS) {
throw new \Exception('Failed to connect to database: '. $e->getMessage());
}
sleep($sleep);
}
} while ($attempts < DATABASE_RECONNECT_MAX_ATTEMPTS);
return $database;
}
/**
* Get Functions Storage Device
* @param string $projectId of the project
* @return Device
*/
protected function getFunctionsDevice($projectId): Device {
return $this->getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId);
}
/**
* Get Files Storage Device
* @param string $projectId of the project
* @return Device
*/
protected function getFilesDevice($projectId): Device {
return $this->getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId);
}
/**
* Get Builds Storage Device
* @param string $projectId of the project
* @return Device
*/
protected function getBuildsDevice($projectId): Device {
return $this->getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId);
}
/**
* Get Device based on selected storage environment
* @param string $root path of the device
* @return Device
*/
private function getDevice($root): Device
{
switch (App::getEnv('_APP_STORAGE_DEVICE', Storage::DEVICE_LOCAL)) {
case Storage::DEVICE_LOCAL:default:
return new Local($root);
case Storage::DEVICE_S3:
$s3AccessKey = App::getEnv('_APP_STORAGE_S3_ACCESS_KEY', '');
$s3SecretKey = App::getEnv('_APP_STORAGE_S3_SECRET', '');
$s3Region = App::getEnv('_APP_STORAGE_S3_REGION', '');
$s3Bucket = App::getEnv('_APP_STORAGE_S3_BUCKET', '');
$s3Acl = 'private';
return new S3($root, $s3AccessKey, $s3SecretKey, $s3Bucket, $s3Region, $s3Acl);
case Storage::DEVICE_DO_SPACES:
$doSpacesAccessKey = App::getEnv('_APP_STORAGE_DO_SPACES_ACCESS_KEY', '');
$doSpacesSecretKey = App::getEnv('_APP_STORAGE_DO_SPACES_SECRET', '');
$doSpacesRegion = App::getEnv('_APP_STORAGE_DO_SPACES_REGION', '');
$doSpacesBucket = App::getEnv('_APP_STORAGE_DO_SPACES_BUCKET', '');
$doSpacesAcl = 'private';
return new DOSpaces($root, $doSpacesAccessKey, $doSpacesSecretKey, $doSpacesBucket, $doSpacesRegion, $doSpacesAcl);
case Storage::DEVICE_BACKBLAZE:
$backblazeAccessKey = App::getEnv('_APP_STORAGE_BACKBLAZE_ACCESS_KEY', '');
$backblazeSecretKey = App::getEnv('_APP_STORAGE_BACKBLAZE_SECRET', '');
$backblazeRegion = App::getEnv('_APP_STORAGE_BACKBLAZE_REGION', '');
$backblazeBucket = App::getEnv('_APP_STORAGE_BACKBLAZE_BUCKET', '');
$backblazeAcl = 'private';
return new BackBlaze($root, $backblazeAccessKey, $backblazeSecretKey, $backblazeBucket, $backblazeRegion, $backblazeAcl);
}
}
}