diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 46296bee3..5b969e63d 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -5,7 +5,6 @@ use Appwrite\Auth\Auth; use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Func; use Appwrite\Event\Validator\Event as ValidatorEvent; use Appwrite\Extend\Exception; use Appwrite\Utopia\Database\Validator\CustomId; @@ -14,6 +13,7 @@ use Utopia\Database\Permission; use Utopia\Database\Role; use Utopia\Database\Validator\UID; use Appwrite\Usage\Stats; +use Utopia\Pools\Group; use Utopia\Storage\Device; use Utopia\Storage\Validator\File; use Utopia\Storage\Validator\FileExt; @@ -41,6 +41,7 @@ use Utopia\CLI\Console; use Utopia\Database\Validator\Roles; use Utopia\Validator\Boolean; use Utopia\Database\Exception\Duplicate as DuplicateException; +use Utopia\Queue\Client as queue; include_once __DIR__ . '/../shared/api.php'; @@ -1040,8 +1041,6 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId') $response->noContent(); }); - - App::post('/v1/functions/:functionId/executions') ->groups(['api', 'functions']) ->desc('Create Execution') @@ -1067,7 +1066,8 @@ App::post('/v1/functions/:functionId/executions') ->inject('events') ->inject('usage') ->inject('mode') - ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode) { + ->inject('pools') + ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools) { $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1155,17 +1155,18 @@ App::post('/v1/functions/:functionId/executions') ->setContext('function', $function); if ($async) { - $event = new Func(); - $event - ->setType('http') - ->setExecution($execution) - ->setFunction($function) - ->setData($data) - ->setJWT($jwt) - ->setProject($project) - ->setUser($user); - - $event->trigger(); + $queue = new queue(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource()); + $queue->enqueue([ + 'type' => 'http', + 'value' => [ + 'type' => 'http', + 'execution' => $execution, + 'function' => $function, + 'data' => $data, + 'jwt' => $jwt, + 'project' => $project, + 'user' => $user + ]]); return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) @@ -1198,11 +1199,11 @@ App::post('/v1/functions/:functionId/executions') deploymentId: $deployment->getId(), path: $build->getAttribute('outputPath', ''), vars: $vars, - data: $data, entrypoint: $deployment->getAttribute('entrypoint', ''), + data: $data, runtime: $function->getAttribute('runtime', ''), - timeout: $function->getAttribute('timeout', 0), - baseImage: $runtime['image'] + baseImage: $runtime['image'], + timeout: $function->getAttribute('timeout', 0) ); /** Update execution status */ diff --git a/app/init.php b/app/init.php index a08dcfd13..450052658 100644 --- a/app/init.php +++ b/app/init.php @@ -38,6 +38,8 @@ use Appwrite\Network\Validator\IP; use Appwrite\Network\Validator\URL; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; +use Utopia\Queue\Client as SyncOut; +use Utopia\Queue\Connection\Redis as QueueRedis; use Appwrite\Usage\Stats; use Appwrite\Utopia\View; use Utopia\App; @@ -75,6 +77,7 @@ use Ahc\Jwt\JWTException; use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; use Swoole\Database\PDOProxy; +use Utopia\Queue; const APP_NAME = 'Appwrite'; const APP_DOMAIN = 'appwrite.io'; @@ -497,6 +500,7 @@ $register->set('logger', function () { $adapter = new $classname($providerConfig); return new Logger($adapter); }); + $register->set('pools', function () { $group = new Group(); @@ -522,30 +526,35 @@ $register->set('pools', function () { 'dsns' => App::getEnv('_APP_CONNECTIONS_DB_CONSOLE', $fallbackForDB), 'multiple' => false, 'schemes' => ['mariadb', 'mysql'], + 'useResource' => true, ], 'database' => [ 'type' => 'database', 'dsns' => App::getEnv('_APP_CONNECTIONS_DB_PROJECT', $fallbackForDB), 'multiple' => true, 'schemes' => ['mariadb', 'mysql'], + 'useResource' => true, ], 'queue' => [ 'type' => 'queue', 'dsns' => App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis), 'multiple' => false, 'schemes' => ['redis'], + 'useResource' => false, ], 'pubsub' => [ 'type' => 'pubsub', 'dsns' => App::getEnv('_APP_CONNECTIONS_PUBSUB', $fallbackForRedis), 'multiple' => false, 'schemes' => ['redis'], + 'useResource' => true, ], 'cache' => [ 'type' => 'cache', 'dsns' => App::getEnv('_APP_CONNECTIONS_CACHE', $fallbackForRedis), 'multiple' => true, 'schemes' => ['redis'], + 'useResource' => true, ], ]; @@ -554,6 +563,7 @@ $register->set('pools', function () { $dsns = $connection['dsns'] ?? ''; $multipe = $connection['multiple'] ?? false; $schemes = $connection['schemes'] ?? []; + $useResource = $connection['useResource'] ?? true; $config = []; $dsns = explode(',', $connection['dsns'] ?? ''); @@ -576,7 +586,7 @@ $register->set('pools', function () { $dsnScheme = $dsn->getScheme(); $dsnDatabase = $dsn->getDatabase(); - if (!in_array($dsnScheme, $schemes)) { + if (!in_array($dsnScheme, $schemes) && $useResource) { throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid console database scheme"); } @@ -635,13 +645,15 @@ $register->set('pools', function () { }; $adapter->setDefaultDatabase($dsn->getDatabase()); - - break; - case 'queue': - $adapter = $resource(); break; case 'pubsub': + break; $adapter = $resource(); + case 'queue': + $adapter = match ($dsn->getScheme()) { + 'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()), + default => 'bla' + }; break; case 'cache': $adapter = match ($dsn->getScheme()) { @@ -664,12 +676,6 @@ $register->set('pools', function () { Config::setParam('pools-' . $key, $config); } - try { - $group->fill(); - } catch (\Throwable $th) { - Console::error('Connection failure: ' . $th->getMessage()); - } - return $group; }); $register->set('influxdb', function () { @@ -847,8 +853,7 @@ App::setResource('messaging', fn() => new Phone()); App::setResource('usage', function ($register) { return new Stats($register->get('statsd')); }, ['register']); - -App::setResource('clients', function ($request, $console, $project) { +App::setResource('clients', function ($request, $console, $project) use ($register) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), 'name' => 'Current Host', @@ -1024,6 +1029,24 @@ App::setResource('console', function () { ]); }, []); +App::setResource('queue', function () { + + $fallbackForRedis = AppwriteURL::unparse([ + 'scheme' => 'redis', + 'host' => App::getEnv('_APP_REDIS_HOST', 'redis'), + 'port' => App::getEnv('_APP_REDIS_PORT', '6379'), + 'user' => App::getEnv('_APP_REDIS_USER', ''), + 'pass' => App::getEnv('_APP_REDIS_PASS', ''), + ]); + + $connection = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis); + + $dsns = explode(',', $connection ?? ''); + $dsn = explode('=', $dsns[0]); + $dsn = $dsn[1] ?? ''; + return new DSN($dsn); +}, []); + App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index a478267b8..f370d7da4 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -3,12 +3,14 @@ global $cli; global $register; +use Appwrite\Event\Event; use Cron\CronExpression; use Utopia\App; use Utopia\CLI\Console; use Utopia\Database\DateTime; use Utopia\Database\Query; use Swoole\Timer; +use Utopia\Queue\Client as worker; const FUNCTION_UPDATE_TIMER = 60; //seconds const FUNCTION_ENQUEUE_TIMER = 60; //seconds @@ -108,7 +110,7 @@ $cli * The timer updates $functions from db on last resourceUpdatedAt attr in X-min. */ Co\run( - function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { + function () use ($register, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { $time = DateTime::now(); $limit = 1000; @@ -162,7 +164,7 @@ $cli /** * The timer sends to worker every 1 min and re-enqueue matched functions. */ - Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { + Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($register, $dbForConsole, &$functions, &$queue) { $timerStart = \microtime(true); $time = DateTime::now(); $timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */ @@ -175,9 +177,17 @@ $cli console::info(count($schedule) . " functions sent to worker for time slot " . $slot); foreach ($schedule as $function) { - /** - * Enqueue function (here should be the Enqueue call - */ + $pools = $register->get('pools'); + $worker = new worker(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource()); + $project = $dbForConsole->getDocument('projects', $function['projectId']); + $worker + ->enqueue([ + 'type' => 'schedule', + 'value' => [ + 'project' => $project, + 'function' => getProjectDB($project)->getDocument('functions', $function['projectId']), + ] + ]); //Console::warning("Enqueueing :{$function['resourceId']}"); $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); diff --git a/app/worker.php b/app/worker.php new file mode 100644 index 000000000..eaef650d9 --- /dev/null +++ b/app/worker.php @@ -0,0 +1,64 @@ + $register); + +Server::setResource('dbForConsole', function (Cache $cache, Registry $register) { + $pools = $register->get('pools'); + $dbAdapter = $pools + ->get('console') + ->pop() + ->getResource() + ; + + $database = new Database($dbAdapter, $cache); + $database->setNamespace('console'); + + return $database; +}, ['cache', 'register']); + +Server::setResource('cache', function (Registry $register) { + $pools = $register->get('pools'); + $list = Config::getParam('pools-cache', []); + $adapters = []; + + foreach ($list as $value) { + $adapters[] = $pools + ->get($value) + ->pop() + ->getResource() + ; + } + + return new Cache(new Sharding($adapters)); +}, ['register']); + +App::setResource('logger', function ($register) { + return $register->get('logger'); +}, ['register']); + + +$pools = $register->get('pools'); +$client = $pools->get('queue')->pop()->getResource(); + + +$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); +$workerNumber = 1; + +Runtime::enableCoroutine(SWOOLE_HOOK_ALL); diff --git a/app/workers/functions.php b/app/workers/functions.php index 1ba4b6575..47d36e4ce 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -1,100 +1,315 @@ getId(); + $deploymentId = $function->getAttribute('deployment', ''); + + /** Check if deployment exists */ + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + if ($deployment->getAttribute('resourceId') !== $functionId) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); } - public function init(): void - { - $this->executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + if ($deployment->isEmpty()) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); } - public function run(): void - { - $type = $this->args['type'] ?? ''; - $events = $this->args['events'] ?? []; - $project = new Document($this->args['project'] ?? []); - $user = new Document($this->args['user'] ?? []); - $payload = json_encode($this->args['payload'] ?? []); + /** Check if build has exists */ + $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); + if ($build->isEmpty()) { + throw new Exception('Build not found', 404); + } + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready', 400); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + } + + $runtime = $runtimes[$function->getAttribute('runtime')]; + + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + if ($execution->isEmpty()) { + $executionId = ID::unique(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], + 'functionId' => $functionId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'response' => '', + 'stderr' => '', + 'duration' => 0.0, + 'search' => implode(' ', [$functionId, $executionId]), + ])); + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + } + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready', 400); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + } + + $runtime = $runtimes[$function->getAttribute('runtime')]; + + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + if ($execution->isEmpty()) { + $executionId = ID::unique(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], + 'functionId' => $functionId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'response' => '', + 'stderr' => '', + 'duration' => 0.0, + 'search' => implode(' ', [$functionId, $executionId]), + ])); + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + } + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $carry[$var->getAttribute('key')] = $var->getAttribute('value'); + return $carry; + }, []); + + /** Collect environment variables */ + $vars = \array_merge($vars, [ + 'APPWRITE_FUNCTION_ID' => $functionId, + 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), + 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, + 'APPWRITE_FUNCTION_TRIGGER' => $trigger, + 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), + 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', + 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', + 'APPWRITE_FUNCTION_EVENT' => $event ?? '', + 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', + 'APPWRITE_FUNCTION_DATA' => $data ?? '', + 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', + 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', + ]); + + /** Execute function */ + try { + $executionResponse = $executor->createExecution( + projectId: $project->getId(), + deploymentId: $deploymentId, + path: $build->getAttribute('outputPath', ''), + vars: $vars, + entrypoint: $deployment->getAttribute('entrypoint', ''), + data: $vars['APPWRITE_FUNCTION_DATA'] ?? '', + runtime: $function->getAttribute('runtime', ''), + baseImage: $runtime['image'], + timeout: $function->getAttribute('timeout', 0) + ); + + /** Update execution status */ + $execution + ->setAttribute('status', $executionResponse['status']) + ->setAttribute('statusCode', $executionResponse['statusCode']) + ->setAttribute('response', $executionResponse['response']) + ->setAttribute('stdout', $executionResponse['stdout']) + ->setAttribute('stderr', $executionResponse['stderr']) + ->setAttribute('duration', $executionResponse['duration']); + } catch (\Throwable $th) { + $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); + $execution + ->setAttribute('duration', (float)$interval->format('%s.%f')) + ->setAttribute('status', 'failed') + ->setAttribute('statusCode', $th->getCode()) + ->setAttribute('stderr', $th->getMessage()); + Console::error($th->getMessage()); + } + + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + /** Trigger Webhook */ + $executionModel = new Execution(); + $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $executionUpdate + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->trigger(); + + /** Trigger Functions */ + $executionUpdate + ->setClass(Event::FUNCTIONS_CLASS_NAME) + ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + ->trigger(); + + /** Trigger realtime event */ + $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ + 'functionId' => $function->getId(), + 'executionId' => $execution->getId() + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $execution + ); + Realtime::send( + projectId: 'console', + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + Realtime::send( + projectId: $project->getId(), + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + + /** Update usage stats */ + global $register; + if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { + $statsd = $register->get('statsd'); + $usage = new Stats($statsd); + $usage + ->setParam('projectId', $project->getId()) + ->setParam('functionId', $function->getId()) + ->setParam('executions.{scope}.compute', 1) + ->setParam('executionStatus', $execution->getAttribute('status', '')) + ->setParam('executionTime', $execution->getAttribute('duration')) + ->setParam('networkRequestSize', 0) + ->setParam('networkResponseSize', 0) + ->submit(); + } +}; + +$adapter = new Queue\Adapter\Swoole($client, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); +$server = new Queue\Server($adapter); + +$server->job() + ->inject('message') + ->inject('dbForProject') + ->action(function (Message $message, Database $dbForProject) use ($execute) { + $args = $message->getPayload()['value'] ?? []; + $type = $message->getPayload()['type'] ?? ''; + $events = $args['events'] ?? []; + $project = new Document($args['project'] ?? []); + $user = new Document($args['user'] ?? []); + // Where $payload comes from + $payload = json_encode($args['payload'] ?? []); if ($project->getId() === 'console') { return; } - $database = $this->getProjectDB($project); - /** * Handle Event execution. */ if (!empty($events)) { $limit = 30; - $sum = 30; - $offset = 0; - $functions = []; - /** @var Document[] $functions */ + $sum = $limit; + $total = 0; + $latestDocument = null; - while ($sum >= $limit) { - $functions = $database->find('functions', [ - Query::limit($limit), - Query::offset($offset), - Query::orderAsc('name'), - ]); - $sum = \count($functions); - $offset = $offset + $limit; + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForProject->find('functions', \array_merge($paginationQueries, [ + Query::orderAsc('name') + ])); + + $sum = count($results); + $total = $total + $sum; Console::log('Fetched ' . $sum . ' functions...'); - foreach ($functions as $function) { + foreach ($results as $function) { if (!array_intersect($events, $function->getAttribute('events', []))) { continue; } Console::success('Iterating function: ' . $function->getAttribute('name')); - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - trigger: 'event', - // Pass first, most verbose event pattern - event: $events[0], - eventData: $payload, - user: $user - ); + // As event, pass first, most verbose event pattern + call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null); Console::success('Triggered function: ' . $events[0]); } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } return; @@ -103,33 +318,20 @@ class FunctionsV1 extends Worker /** * Handle Schedule and HTTP execution. */ - $user = new Document($this->args['user'] ?? []); - $project = new Document($this->args['project'] ?? []); - $execution = new Document($this->args['execution'] ?? []); - $function = new Document($this->args['function'] ?? []); + $user = new Document($args['user'] ?? []); + $project = new Document($args['project'] ?? []); + $execution = new Document($args['execution'] ?? []); + $function = new Document($args['function'] ?? []); switch ($type) { case 'http': - $jwt = $this->args['jwt'] ?? ''; - $data = $this->args['data'] ?? ''; - - $function = $database->getDocument('functions', $execution->getAttribute('functionId')); - - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - executionId: $execution->getId(), - trigger: 'http', - data: $data, - user: $user, - jwt: $jwt - ); - + $jwt = $args['jwt'] ?? ''; + $data = $args['data'] ?? ''; + $function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId')); + call_user_func($execute, $project, $function, $dbForProject, 'http', $execution->getId(), null, null, $data, $user, $jwt); break; case 'schedule': - $functionOriginal = $function; /* * 1. Get Original Task * 2. Check for updates @@ -143,242 +345,54 @@ class FunctionsV1 extends Worker * If error count bigger than allowed change status to pause */ - // Reschedule - $function = $database->getDocument('functions', $function->getId()); - - if (empty($function->getId())) { - throw new Exception('Function not found (' . $function->getId() . ')'); - } - - if ($functionOriginal->getAttribute('schedule') !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. - return; - } - - if ($functionOriginal->getAttribute('scheduleUpdatedAt') !== $function->getAttribute('scheduleUpdatedAt')) { // Double execution due to rapid cron changes, ignore this run. - return; - } - - $cron = new CronExpression($function->getAttribute('schedule')); - $next = DateTime::format($cron->getNextRunDate()); - - $function = $function - ->setAttribute('scheduleNext', $next) - ->setAttribute('schedulePrevious', DateTime::now()); - - $function = $database->updateDocument( - 'functions', - $function->getId(), - $function - ); - - $reschedule = new Func(); - $reschedule - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); - ; - - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - trigger: 'schedule' - ); - + call_user_func($execute, $project, $function, $dbForProject, 'schedule', null, null, null, null, null, null); break; } - } + }); - private function execute( - Document $project, - Document $function, - Database $dbForProject, - string $trigger, - string $executionId = null, - string $event = null, - string $eventData = null, - string $data = null, - ?Document $user = null, - string $jwt = null - ) { +$server + ->error() + ->inject('error') + ->inject('logger') + ->inject('register') + ->action(function ($error, $logger, $register) { - $user ??= new Document(); - $functionId = $function->getId(); - $deploymentId = $function->getAttribute('deployment', ''); + $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); - /** Check if deployment exists */ - $deployment = $dbForProject->getDocument('deployments', $deploymentId); - - if ($deployment->getAttribute('resourceId') !== $functionId) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + if ($error instanceof PDOException) { + throw $error; } - if ($deployment->isEmpty()) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + if ($error->getCode() >= 500 || $error->getCode() === 0) { + $log = new Log(); + + $log->setNamespace("appwrite-worker"); + $log->setServer(\gethostname()); + $log->setVersion($version); + $log->setType(Log::TYPE_ERROR); + $log->setMessage($error->getMessage()); + $log->setAction('appwrite-worker-functions'); + $log->addTag('verboseType', get_class($error)); + $log->addTag('code', $error->getCode()); + $log->addExtra('file', $error->getFile()); + $log->addExtra('line', $error->getLine()); + $log->addExtra('trace', $error->getTraceAsString()); + $log->addExtra('detailedTrace', $error->getTrace()); + $log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles); + + $isProduction = App::getEnv('_APP_ENV', 'development') === 'production'; + $log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING); + + $logger->addLog($log); } - /** Check if build has exists */ - $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); - if ($build->isEmpty()) { - throw new Exception('Build not found', 404); - } + Console::error('[Error] Type: ' . get_class($error)); + Console::error('[Error] Message: ' . $error->getMessage()); + Console::error('[Error] File: ' . $error->getFile()); + Console::error('[Error] Line: ' . $error->getLine()); - if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready', 400); - } + $register->get('pools')->reclaim(); + }); - /** Check if runtime is supported */ - $runtimes = Config::getParam('runtimes', []); - - if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); - } - - $runtime = $runtimes[$function->getAttribute('runtime')]; - - /** Create execution or update execution status */ - $execution = $dbForProject->getDocument('executions', $executionId ?? ''); - if ($execution->isEmpty()) { - $executionId = ID::unique(); - $execution = $dbForProject->createDocument('executions', new Document([ - '$id' => $executionId, - '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], - 'functionId' => $functionId, - 'deploymentId' => $deploymentId, - 'trigger' => $trigger, - 'status' => 'waiting', - 'statusCode' => 0, - 'response' => '', - 'stderr' => '', - 'duration' => 0.0, - 'search' => implode(' ', [$functionId, $executionId]), - ])); - - if ($execution->isEmpty()) { - throw new Exception('Failed to create or read execution'); - } - } - $execution->setAttribute('status', 'processing'); - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { - $carry[$var->getAttribute('key')] = $var->getAttribute('value'); - return $carry; - }, []); - - /** Collect environment variables */ - $vars = \array_merge($vars, [ - 'APPWRITE_FUNCTION_ID' => $functionId, - 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), - 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, - 'APPWRITE_FUNCTION_TRIGGER' => $trigger, - 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), - 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', - 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', - 'APPWRITE_FUNCTION_EVENT' => $event ?? '', - 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', - 'APPWRITE_FUNCTION_DATA' => $data ?? '', - 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', - 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', - ]); - - /** Execute function */ - try { - $executionResponse = $this->executor->createExecution( - projectId: $project->getId(), - deploymentId: $deploymentId, - path: $build->getAttribute('outputPath', ''), - vars: $vars, - entrypoint: $deployment->getAttribute('entrypoint', ''), - data: $vars['APPWRITE_FUNCTION_DATA'] ?? '', - runtime: $function->getAttribute('runtime', ''), - timeout: $function->getAttribute('timeout', 0), - baseImage: $runtime['image'] - ); - - /** Update execution status */ - $execution - ->setAttribute('status', $executionResponse['status']) - ->setAttribute('statusCode', $executionResponse['statusCode']) - ->setAttribute('response', $executionResponse['response']) - ->setAttribute('stdout', $executionResponse['stdout']) - ->setAttribute('stderr', $executionResponse['stderr']) - ->setAttribute('duration', $executionResponse['duration']); - } catch (\Throwable $th) { - $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); - $execution - ->setAttribute('duration', (float)$interval->format('%s.%f')) - ->setAttribute('status', 'failed') - ->setAttribute('statusCode', $th->getCode()) - ->setAttribute('stderr', $th->getMessage()); - Console::error($th->getMessage()); - } - - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - - /** Trigger Webhook */ - $executionModel = new Execution(); - $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); - $executionUpdate - ->setProject($project) - ->setUser($user) - ->setEvent('functions.[functionId].executions.[executionId].update') - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) - ->trigger(); - - /** Trigger Functions */ - $executionUpdate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) - ->trigger(); - - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - - /** Update usage stats */ - global $register; - if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { - $statsd = $register->get('statsd'); - $usage = new Stats($statsd); - $usage - ->setParam('projectId', $project->getId()) - ->setParam('functionId', $function->getId()) - ->setParam('executions.{scope}.compute', 1) - ->setParam('executionStatus', $execution->getAttribute('status', '')) - ->setParam('executionTime', $execution->getAttribute('duration')) - ->setParam('networkRequestSize', 0) - ->setParam('networkResponseSize', 0) - ->submit(); - } - } - - public function shutdown(): void - { - } -} +$server->workerStart(); +$server->start(); diff --git a/composer.json b/composer.json index e09bec422..6843ef689 100644 --- a/composer.json +++ b/composer.json @@ -61,7 +61,8 @@ "utopia-php/websocket": "0.1.0", "utopia-php/image": "0.5.*", "utopia-php/orchestration": "0.6.*", - "utopia-php/pools": "0.1.*", + "utopia-php/queue": "0.4.0", + "utopia-php/pools": "dev-upgrade-cli as 0.2.0", "resque/php-resque": "1.3.6", "matomo/device-detector": "6.0.0", "dragonmantank/cron-expression": "3.3.1", diff --git a/composer.lock b/composer.lock index 28d48b7f3..6c6d12945 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f3beee3a829a19e53b311052111bde2c", + "content-hash": "a744959294e219fff6ea9c17f9fb0705", "packages": [ { "name": "adhocore/jwt", @@ -115,15 +115,15 @@ }, { "name": "appwrite/php-runtimes", - "version": "0.11.0", + "version": "0.11.1", "source": { "type": "git", "url": "https://github.com/appwrite/runtimes.git", - "reference": "547fc026e11c0946846a8ac690898f5bf53be101" + "reference": "9d74a477ba3333cbcfac565c46fcf19606b7b603" }, "require": { "php": ">=8.0", - "utopia-php/system": "0.4.*" + "utopia-php/system": "0.6.*" }, "require-dev": { "phpunit/phpunit": "^9.3", @@ -154,7 +154,7 @@ "php", "runtimes" ], - "time": "2022-08-15T14:03:36+00:00" + "time": "2022-11-07T16:45:52+00:00" }, { "name": "chillerlan/php-qrcode", @@ -300,16 +300,16 @@ }, { "name": "colinmollenhour/credis", - "version": "v1.13.1", + "version": "v1.14.0", "source": { "type": "git", "url": "https://github.com/colinmollenhour/credis.git", - "reference": "85df015088e00daf8ce395189de22c8eb45c8d49" + "reference": "dccc8a46586475075fbb012d8bd523b8a938c2dc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/85df015088e00daf8ce395189de22c8eb45c8d49", - "reference": "85df015088e00daf8ce395189de22c8eb45c8d49", + "url": "https://api.github.com/repos/colinmollenhour/credis/zipball/dccc8a46586475075fbb012d8bd523b8a938c2dc", + "reference": "dccc8a46586475075fbb012d8bd523b8a938c2dc", "shasum": "" }, "require": { @@ -341,9 +341,9 @@ "homepage": "https://github.com/colinmollenhour/credis", "support": { "issues": "https://github.com/colinmollenhour/credis/issues", - "source": "https://github.com/colinmollenhour/credis/tree/v1.13.1" + "source": "https://github.com/colinmollenhour/credis/tree/v1.14.0" }, - "time": "2022-06-20T22:56:59+00:00" + "time": "2022-11-09T01:18:39+00:00" }, { "name": "composer/package-versions-deprecated", @@ -693,16 +693,16 @@ }, { "name": "guzzlehttp/psr7", - "version": "2.4.1", + "version": "2.4.3", "source": { "type": "git", "url": "https://github.com/guzzle/psr7.git", - "reference": "69568e4293f4fa993f3b0e51c9723e1e17c41379" + "reference": "67c26b443f348a51926030c83481b85718457d3d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/guzzle/psr7/zipball/69568e4293f4fa993f3b0e51c9723e1e17c41379", - "reference": "69568e4293f4fa993f3b0e51c9723e1e17c41379", + "url": "https://api.github.com/repos/guzzle/psr7/zipball/67c26b443f348a51926030c83481b85718457d3d", + "reference": "67c26b443f348a51926030c83481b85718457d3d", "shasum": "" }, "require": { @@ -792,7 +792,7 @@ ], "support": { "issues": "https://github.com/guzzle/psr7/issues", - "source": "https://github.com/guzzle/psr7/tree/2.4.1" + "source": "https://github.com/guzzle/psr7/tree/2.4.3" }, "funding": [ { @@ -808,7 +808,7 @@ "type": "tidelift" } ], - "time": "2022-08-28T14:45:39+00:00" + "time": "2022-10-26T14:07:24+00:00" }, { "name": "influxdb/influxdb-php", @@ -931,6 +931,72 @@ }, "time": "2021-02-04T16:20:16+00:00" }, + { + "name": "laravel/pint", + "version": "v1.2.0", + "source": { + "type": "git", + "url": "https://github.com/laravel/pint.git", + "reference": "1d276e4c803397a26cc337df908f55c2a4e90d86" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/laravel/pint/zipball/1d276e4c803397a26cc337df908f55c2a4e90d86", + "reference": "1d276e4c803397a26cc337df908f55c2a4e90d86", + "shasum": "" + }, + "require": { + "ext-json": "*", + "ext-mbstring": "*", + "ext-tokenizer": "*", + "ext-xml": "*", + "php": "^8.0" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "^3.11.0", + "illuminate/view": "^9.27", + "laravel-zero/framework": "^9.1.3", + "mockery/mockery": "^1.5.0", + "nunomaduro/larastan": "^2.2", + "nunomaduro/termwind": "^1.14.0", + "pestphp/pest": "^1.22.1" + }, + "bin": [ + "builds/pint" + ], + "type": "project", + "autoload": { + "psr-4": { + "App\\": "app/", + "Database\\Seeders\\": "database/seeders/", + "Database\\Factories\\": "database/factories/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nuno Maduro", + "email": "enunomaduro@gmail.com" + } + ], + "description": "An opinionated code formatter for PHP.", + "homepage": "https://laravel.com", + "keywords": [ + "format", + "formatter", + "lint", + "linter", + "php" + ], + "support": { + "issues": "https://github.com/laravel/pint/issues", + "source": "https://github.com/laravel/pint" + }, + "time": "2022-09-13T15:07:15+00:00" + }, { "name": "matomo/device-detector", "version": "6.0.0", @@ -2431,23 +2497,24 @@ }, { "name": "utopia-php/pools", - "version": "0.1.0", + "version": "dev-upgrade-cli", "source": { "type": "git", "url": "https://github.com/utopia-php/pools.git", - "reference": "5a467a569a80aefc846a97dc195b4adc2fd71805" + "reference": "88a2c1ed2badbfdf2787ce0a12def2c988fc1097" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/pools/zipball/5a467a569a80aefc846a97dc195b4adc2fd71805", - "reference": "5a467a569a80aefc846a97dc195b4adc2fd71805", + "url": "https://api.github.com/repos/utopia-php/pools/zipball/88a2c1ed2badbfdf2787ce0a12def2c988fc1097", + "reference": "88a2c1ed2badbfdf2787ce0a12def2c988fc1097", "shasum": "" }, "require": { "ext-mongodb": "*", "ext-pdo": "*", "ext-redis": "*", - "php": ">=8.0" + "php": ">=8.0", + "utopia-php/cli": "0.13.*" }, "require-dev": { "phpunit/phpunit": "^9.4", @@ -2478,9 +2545,9 @@ ], "support": { "issues": "https://github.com/utopia-php/pools/issues", - "source": "https://github.com/utopia-php/pools/tree/0.1.0" + "source": "https://github.com/utopia-php/pools/tree/upgrade-cli" }, - "time": "2022-10-11T19:31:07+00:00" + "time": "2022-11-04T08:33:04+00:00" }, { "name": "utopia-php/preloader", @@ -2535,6 +2602,67 @@ }, "time": "2020-10-24T07:04:59+00:00" }, + { + "name": "utopia-php/queue", + "version": "0.4.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/queue.git", + "reference": "0cad4cf4231377aa6c67956b51ba1954e0d02166" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/0cad4cf4231377aa6c67956b51ba1954e0d02166", + "reference": "0cad4cf4231377aa6c67956b51ba1954e0d02166", + "shasum": "" + }, + "require": { + "php": ">=8.0", + "utopia-php/cli": "0.13.*", + "utopia-php/framework": "0.*.*" + }, + "require-dev": { + "laravel/pint": "^0.2.3", + "phpstan/phpstan": "^1.8", + "phpunit/phpunit": "^9.5.5", + "swoole/ide-helper": "4.8.8", + "workerman/workerman": "^4.0" + }, + "suggest": { + "ext-swoole": "Needed to support Swoole.", + "workerman/workerman": "Needed to support Workerman." + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Queue\\": "src/Queue" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Torsten Dittmann", + "email": "torsten@appwrite.io" + } + ], + "description": "A powerful task queue.", + "keywords": [ + "Tasks", + "framework", + "php", + "queue", + "upf", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/queue/issues", + "source": "https://github.com/utopia-php/queue/tree/0.4.0" + }, + "time": "2022-10-31T06:23:08+00:00" + }, { "name": "utopia-php/registry", "version": "0.5.0", @@ -2700,23 +2828,25 @@ }, { "name": "utopia-php/system", - "version": "0.4.0", + "version": "0.6.0", "source": { "type": "git", "url": "https://github.com/utopia-php/system.git", - "reference": "67c92c66ce8f0cc925a00bca89f7a188bf9183c0" + "reference": "289c4327713deadc9c748b5317d248133a02f245" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/system/zipball/67c92c66ce8f0cc925a00bca89f7a188bf9183c0", - "reference": "67c92c66ce8f0cc925a00bca89f7a188bf9183c0", + "url": "https://api.github.com/repos/utopia-php/system/zipball/289c4327713deadc9c748b5317d248133a02f245", + "reference": "289c4327713deadc9c748b5317d248133a02f245", "shasum": "" }, "require": { + "laravel/pint": "1.2.*", "php": ">=7.4" }, "require-dev": { "phpunit/phpunit": "^9.3", + "squizlabs/php_codesniffer": "^3.6", "vimeo/psalm": "4.0.1" }, "type": "library", @@ -2749,9 +2879,9 @@ ], "support": { "issues": "https://github.com/utopia-php/system/issues", - "source": "https://github.com/utopia-php/system/tree/0.4.0" + "source": "https://github.com/utopia-php/system/tree/0.6.0" }, - "time": "2021-02-04T14:14:49+00:00" + "time": "2022-11-07T13:51:59+00:00" }, { "name": "utopia-php/websocket", @@ -3574,16 +3704,16 @@ }, { "name": "phpunit/php-code-coverage", - "version": "9.2.17", + "version": "9.2.18", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "aa94dc41e8661fe90c7316849907cba3007b10d8" + "reference": "12fddc491826940cf9b7e88ad9664cf51f0f6d0a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/aa94dc41e8661fe90c7316849907cba3007b10d8", - "reference": "aa94dc41e8661fe90c7316849907cba3007b10d8", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/12fddc491826940cf9b7e88ad9664cf51f0f6d0a", + "reference": "12fddc491826940cf9b7e88ad9664cf51f0f6d0a", "shasum": "" }, "require": { @@ -3639,7 +3769,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/php-code-coverage/issues", - "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.17" + "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.18" }, "funding": [ { @@ -3647,7 +3777,7 @@ "type": "github" } ], - "time": "2022-08-30T12:24:04+00:00" + "time": "2022-10-27T13:35:33+00:00" }, { "name": "phpunit/php-file-iterator", @@ -5402,11 +5532,18 @@ "version": "dev-feat-update-cache-lib", "alias": "0.26.1", "alias_normalized": "0.26.1.0" + }, + { + "package": "utopia-php/pools", + "version": "dev-upgrade-cli", + "alias": "0.2.0", + "alias_normalized": "0.2.0.0" } ], "minimum-stability": "stable", "stability-flags": { - "utopia-php/database": 20 + "utopia-php/database": 20, + "utopia-php/pools": 20 }, "prefer-stable": false, "prefer-lowest": false, @@ -5431,5 +5568,5 @@ "platform-overrides": { "php": "8.0" }, - "plugin-api-version": "2.3.0" + "plugin-api-version": "2.2.0" } diff --git a/src/Appwrite/Utopia/Response/Model/Execution.php b/src/Appwrite/Utopia/Response/Model/Execution.php index 13011a24b..987a140df 100644 --- a/src/Appwrite/Utopia/Response/Model/Execution.php +++ b/src/Appwrite/Utopia/Response/Model/Execution.php @@ -6,6 +6,7 @@ use Appwrite\Utopia\Response; use Appwrite\Utopia\Response\Model; use Utopia\Database\Role; + class Execution extends Model { public function __construct() diff --git a/src/Appwrite/Utopia/Response/Model/Func.php b/src/Appwrite/Utopia/Response/Model/Func.php index c7e69fff8..540b14387 100644 --- a/src/Appwrite/Utopia/Response/Model/Func.php +++ b/src/Appwrite/Utopia/Response/Model/Func.php @@ -81,18 +81,6 @@ class Func extends Model 'default' => '', 'example' => '5 4 * * *', ]) - ->addRule('scheduleNext', [ - 'type' => self::TYPE_DATETIME, - 'description' => 'Function\'s next scheduled execution time in ISO 8601 format.', - 'default' => '', - 'example' => self::TYPE_DATETIME_EXAMPLE, - ]) - ->addRule('schedulePrevious', [ - 'type' => self::TYPE_DATETIME, - 'description' => 'Function\'s previous scheduled execution time in ISO 8601 format.', - 'default' => '', - 'example' => self::TYPE_DATETIME_EXAMPLE, - ]) ->addRule('timeout', [ 'type' => self::TYPE_INTEGER, 'description' => 'Function execution timeout in seconds.',