1
0
Fork 0
mirror of synced 2024-06-26 18:20:43 +12:00

function worker

This commit is contained in:
shimon 2022-11-09 19:01:43 +02:00
parent 9d641b985a
commit 1761b77d0f
9 changed files with 625 additions and 386 deletions

View file

@ -5,7 +5,6 @@ use Appwrite\Auth\Auth;
use Appwrite\Event\Build;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Validator\Event as ValidatorEvent;
use Appwrite\Extend\Exception;
use Appwrite\Utopia\Database\Validator\CustomId;
@ -14,6 +13,7 @@ use Utopia\Database\Permission;
use Utopia\Database\Role;
use Utopia\Database\Validator\UID;
use Appwrite\Usage\Stats;
use Utopia\Pools\Group;
use Utopia\Storage\Device;
use Utopia\Storage\Validator\File;
use Utopia\Storage\Validator\FileExt;
@ -41,6 +41,7 @@ use Utopia\CLI\Console;
use Utopia\Database\Validator\Roles;
use Utopia\Validator\Boolean;
use Utopia\Database\Exception\Duplicate as DuplicateException;
use Utopia\Queue\Client as queue;
include_once __DIR__ . '/../shared/api.php';
@ -1040,8 +1041,6 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId')
$response->noContent();
});
App::post('/v1/functions/:functionId/executions')
->groups(['api', 'functions'])
->desc('Create Execution')
@ -1067,7 +1066,8 @@ App::post('/v1/functions/:functionId/executions')
->inject('events')
->inject('usage')
->inject('mode')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode) {
->inject('pools')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Group $pools) {
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
@ -1155,17 +1155,18 @@ App::post('/v1/functions/:functionId/executions')
->setContext('function', $function);
if ($async) {
$event = new Func();
$event
->setType('http')
->setExecution($execution)
->setFunction($function)
->setData($data)
->setJWT($jwt)
->setProject($project)
->setUser($user);
$event->trigger();
$queue = new queue(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource());
$queue->enqueue([
'type' => 'http',
'value' => [
'type' => 'http',
'execution' => $execution,
'function' => $function,
'data' => $data,
'jwt' => $jwt,
'project' => $project,
'user' => $user
]]);
return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
@ -1198,11 +1199,11 @@ App::post('/v1/functions/:functionId/executions')
deploymentId: $deployment->getId(),
path: $build->getAttribute('outputPath', ''),
vars: $vars,
data: $data,
entrypoint: $deployment->getAttribute('entrypoint', ''),
data: $data,
runtime: $function->getAttribute('runtime', ''),
timeout: $function->getAttribute('timeout', 0),
baseImage: $runtime['image']
baseImage: $runtime['image'],
timeout: $function->getAttribute('timeout', 0)
);
/** Update execution status */

View file

@ -38,6 +38,8 @@ use Appwrite\Network\Validator\IP;
use Appwrite\Network\Validator\URL;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\URL\URL as AppwriteURL;
use Utopia\Queue\Client as SyncOut;
use Utopia\Queue\Connection\Redis as QueueRedis;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\View;
use Utopia\App;
@ -75,6 +77,7 @@ use Ahc\Jwt\JWTException;
use MaxMind\Db\Reader;
use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Database\PDOProxy;
use Utopia\Queue;
const APP_NAME = 'Appwrite';
const APP_DOMAIN = 'appwrite.io';
@ -497,6 +500,7 @@ $register->set('logger', function () {
$adapter = new $classname($providerConfig);
return new Logger($adapter);
});
$register->set('pools', function () {
$group = new Group();
@ -522,30 +526,35 @@ $register->set('pools', function () {
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_CONSOLE', $fallbackForDB),
'multiple' => false,
'schemes' => ['mariadb', 'mysql'],
'useResource' => true,
],
'database' => [
'type' => 'database',
'dsns' => App::getEnv('_APP_CONNECTIONS_DB_PROJECT', $fallbackForDB),
'multiple' => true,
'schemes' => ['mariadb', 'mysql'],
'useResource' => true,
],
'queue' => [
'type' => 'queue',
'dsns' => App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis),
'multiple' => false,
'schemes' => ['redis'],
'useResource' => false,
],
'pubsub' => [
'type' => 'pubsub',
'dsns' => App::getEnv('_APP_CONNECTIONS_PUBSUB', $fallbackForRedis),
'multiple' => false,
'schemes' => ['redis'],
'useResource' => true,
],
'cache' => [
'type' => 'cache',
'dsns' => App::getEnv('_APP_CONNECTIONS_CACHE', $fallbackForRedis),
'multiple' => true,
'schemes' => ['redis'],
'useResource' => true,
],
];
@ -554,6 +563,7 @@ $register->set('pools', function () {
$dsns = $connection['dsns'] ?? '';
$multipe = $connection['multiple'] ?? false;
$schemes = $connection['schemes'] ?? [];
$useResource = $connection['useResource'] ?? true;
$config = [];
$dsns = explode(',', $connection['dsns'] ?? '');
@ -576,7 +586,7 @@ $register->set('pools', function () {
$dsnScheme = $dsn->getScheme();
$dsnDatabase = $dsn->getDatabase();
if (!in_array($dsnScheme, $schemes)) {
if (!in_array($dsnScheme, $schemes) && $useResource) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Invalid console database scheme");
}
@ -635,13 +645,15 @@ $register->set('pools', function () {
};
$adapter->setDefaultDatabase($dsn->getDatabase());
break;
case 'queue':
$adapter = $resource();
break;
case 'pubsub':
break;
$adapter = $resource();
case 'queue':
$adapter = match ($dsn->getScheme()) {
'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()),
default => 'bla'
};
break;
case 'cache':
$adapter = match ($dsn->getScheme()) {
@ -664,12 +676,6 @@ $register->set('pools', function () {
Config::setParam('pools-' . $key, $config);
}
try {
$group->fill();
} catch (\Throwable $th) {
Console::error('Connection failure: ' . $th->getMessage());
}
return $group;
});
$register->set('influxdb', function () {
@ -847,8 +853,7 @@ App::setResource('messaging', fn() => new Phone());
App::setResource('usage', function ($register) {
return new Stats($register->get('statsd'));
}, ['register']);
App::setResource('clients', function ($request, $console, $project) {
App::setResource('clients', function ($request, $console, $project) use ($register) {
$console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'),
'name' => 'Current Host',
@ -1024,6 +1029,24 @@ App::setResource('console', function () {
]);
}, []);
App::setResource('queue', function () {
$fallbackForRedis = AppwriteURL::unparse([
'scheme' => 'redis',
'host' => App::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => App::getEnv('_APP_REDIS_PORT', '6379'),
'user' => App::getEnv('_APP_REDIS_USER', ''),
'pass' => App::getEnv('_APP_REDIS_PASS', ''),
]);
$connection = App::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis);
$dsns = explode(',', $connection ?? '');
$dsn = explode('=', $dsns[0]);
$dsn = $dsn[1] ?? '';
return new DSN($dsn);
}, []);
App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;

View file

@ -3,12 +3,14 @@
global $cli;
global $register;
use Appwrite\Event\Event;
use Cron\CronExpression;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Swoole\Timer;
use Utopia\Queue\Client as worker;
const FUNCTION_UPDATE_TIMER = 60; //seconds
const FUNCTION_ENQUEUE_TIMER = 60; //seconds
@ -108,7 +110,7 @@ $cli
* The timer updates $functions from db on last resourceUpdatedAt attr in X-min.
*/
Co\run(
function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
function () use ($register, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) {
$time = DateTime::now();
$limit = 1000;
@ -162,7 +164,7 @@ $cli
/**
* The timer sends to worker every 1 min and re-enqueue matched functions.
*/
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) {
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($register, $dbForConsole, &$functions, &$queue) {
$timerStart = \microtime(true);
$time = DateTime::now();
$timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */
@ -175,9 +177,17 @@ $cli
console::info(count($schedule) . " functions sent to worker for time slot " . $slot);
foreach ($schedule as $function) {
/**
* Enqueue function (here should be the Enqueue call
*/
$pools = $register->get('pools');
$worker = new worker(Event::FUNCTIONS_QUEUE_NAME, $pools->get('queue')->pop()->getResource());
$project = $dbForConsole->getDocument('projects', $function['projectId']);
$worker
->enqueue([
'type' => 'schedule',
'value' => [
'project' => $project,
'function' => getProjectDB($project)->getDocument('functions', $function['projectId']),
]
]);
//Console::warning("Enqueueing :{$function['resourceId']}");
$cron = new CronExpression($function['schedule']);
$next = DateTime::format($cron->getNextRunDate());

64
app/worker.php Normal file
View file

@ -0,0 +1,64 @@
<?php
require_once __DIR__ . '/init.php';
use Swoole\Runtime;
use Utopia\App;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
global $register;
Server::setResource('register', fn() => $register);
Server::setResource('dbForConsole', function (Cache $cache, Registry $register) {
$pools = $register->get('pools');
$dbAdapter = $pools
->get('console')
->pop()
->getResource()
;
$database = new Database($dbAdapter, $cache);
$database->setNamespace('console');
return $database;
}, ['cache', 'register']);
Server::setResource('cache', function (Registry $register) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}, ['register']);
App::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);
$pools = $register->get('pools');
$client = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$workerNumber = 1;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

View file

@ -1,100 +1,315 @@
<?php
require_once __DIR__ . '/../worker.php';
use Utopia\Queue;
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\Response\Model\Execution;
use Cron\CronExpression;
use Executor\Executor;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\ID;
use Utopia\Database\Permission;
use Utopia\Database\Query;
use Utopia\Database\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Logger\Log;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Functions V1 Worker');
Console::success(APP_NAME . ' functions worker v1 has started');
global $client;
global $workerNumber;
class FunctionsV1 extends Worker
{
private ?Executor $executor = null;
public array $args = [];
public array $allowed = [];
$executor = new Executor(App::getEnv('_APP_FUNCTIONS_PROXY_HOST'));
public function getName(): string
{
return "functions";
$execute = function (
Document $project,
Document $function,
Database $dbForProject,
string $trigger,
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
) use ($executor) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
public function init(): void
{
$this->executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
public function run(): void
{
$type = $this->args['type'] ?? '';
$events = $this->args['events'] ?? [];
$project = new Document($this->args['project'] ?? []);
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
/** Collect environment variables */
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
'APPWRITE_FUNCTION_EVENT' => $event ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '',
'APPWRITE_FUNCTION_DATA' => $data ?? '',
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
]);
/** Execute function */
try {
$executionResponse = $executor->createExecution(
projectId: $project->getId(),
deploymentId: $deploymentId,
path: $build->getAttribute('outputPath', ''),
vars: $vars,
entrypoint: $deployment->getAttribute('entrypoint', ''),
data: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
runtime: $function->getAttribute('runtime', ''),
baseImage: $runtime['image'],
timeout: $function->getAttribute('timeout', 0)
);
/** Update execution status */
$execution
->setAttribute('status', $executionResponse['status'])
->setAttribute('statusCode', $executionResponse['statusCode'])
->setAttribute('response', $executionResponse['response'])
->setAttribute('stdout', $executionResponse['stdout'])
->setAttribute('stderr', $executionResponse['stderr'])
->setAttribute('duration', $executionResponse['duration']);
} catch (\Throwable $th) {
$interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt()));
$execution
->setAttribute('duration', (float)$interval->format('%s.%f'))
->setAttribute('status', 'failed')
->setAttribute('statusCode', $th->getCode())
->setAttribute('stderr', $th->getMessage());
Console::error($th->getMessage());
}
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration'))
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
}
};
$adapter = new Queue\Adapter\Swoole($client, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
$server = new Queue\Server($adapter);
$server->job()
->inject('message')
->inject('dbForProject')
->action(function (Message $message, Database $dbForProject) use ($execute) {
$args = $message->getPayload()['value'] ?? [];
$type = $message->getPayload()['type'] ?? '';
$events = $args['events'] ?? [];
$project = new Document($args['project'] ?? []);
$user = new Document($args['user'] ?? []);
// Where $payload comes from
$payload = json_encode($args['payload'] ?? []);
if ($project->getId() === 'console') {
return;
}
$database = $this->getProjectDB($project);
/**
* Handle Event execution.
*/
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
$sum = $limit;
$total = 0;
$latestDocument = null;
while ($sum >= $limit) {
$functions = $database->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForProject->find('functions', \array_merge($paginationQueries, [
Query::orderAsc('name')
]));
$sum = count($results);
$total = $total + $sum;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
foreach ($results as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'event',
// Pass first, most verbose event pattern
event: $events[0],
eventData: $payload,
user: $user
);
// As event, pass first, most verbose event pattern
call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null);
Console::success('Triggered function: ' . $events[0]);
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
return;
@ -103,33 +318,20 @@ class FunctionsV1 extends Worker
/**
* Handle Schedule and HTTP execution.
*/
$user = new Document($this->args['user'] ?? []);
$project = new Document($this->args['project'] ?? []);
$execution = new Document($this->args['execution'] ?? []);
$function = new Document($this->args['function'] ?? []);
$user = new Document($args['user'] ?? []);
$project = new Document($args['project'] ?? []);
$execution = new Document($args['execution'] ?? []);
$function = new Document($args['function'] ?? []);
switch ($type) {
case 'http':
$jwt = $this->args['jwt'] ?? '';
$data = $this->args['data'] ?? '';
$function = $database->getDocument('functions', $execution->getAttribute('functionId'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
executionId: $execution->getId(),
trigger: 'http',
data: $data,
user: $user,
jwt: $jwt
);
$jwt = $args['jwt'] ?? '';
$data = $args['data'] ?? '';
$function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId'));
call_user_func($execute, $project, $function, $dbForProject, 'http', $execution->getId(), null, null, $data, $user, $jwt);
break;
case 'schedule':
$functionOriginal = $function;
/*
* 1. Get Original Task
* 2. Check for updates
@ -143,242 +345,54 @@ class FunctionsV1 extends Worker
* If error count bigger than allowed change status to pause
*/
// Reschedule
$function = $database->getDocument('functions', $function->getId());
if (empty($function->getId())) {
throw new Exception('Function not found (' . $function->getId() . ')');
}
if ($functionOriginal->getAttribute('schedule') !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
return;
}
if ($functionOriginal->getAttribute('scheduleUpdatedAt') !== $function->getAttribute('scheduleUpdatedAt')) { // Double execution due to rapid cron changes, ignore this run.
return;
}
$cron = new CronExpression($function->getAttribute('schedule'));
$next = DateTime::format($cron->getNextRunDate());
$function = $function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', DateTime::now());
$function = $database->updateDocument(
'functions',
$function->getId(),
$function
);
$reschedule = new Func();
$reschedule
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
;
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'schedule'
);
call_user_func($execute, $project, $function, $dbForProject, 'schedule', null, null, null, null, null, null);
break;
}
}
});
private function execute(
Document $project,
Document $function,
Database $dbForProject,
string $trigger,
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
) {
$server
->error()
->inject('error')
->inject('logger')
->inject('register')
->action(function ($error, $logger, $register) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
if ($error instanceof PDOException) {
throw $error;
}
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
if ($error->getCode() >= 500 || $error->getCode() === 0) {
$log = new Log();
$log->setNamespace("appwrite-worker");
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-worker-functions');
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$logger->addLog($log);
}
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
}
$register->get('pools')->reclaim();
});
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
$runtime = $runtimes[$function->getAttribute('runtime')];
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {
$executionId = ID::unique();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))],
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'response' => '',
'stderr' => '',
'duration' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
/** Collect environment variables */
$vars = \array_merge($vars, [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '',
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '',
'APPWRITE_FUNCTION_EVENT' => $event ?? '',
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '',
'APPWRITE_FUNCTION_DATA' => $data ?? '',
'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '',
'APPWRITE_FUNCTION_JWT' => $jwt ?? '',
]);
/** Execute function */
try {
$executionResponse = $this->executor->createExecution(
projectId: $project->getId(),
deploymentId: $deploymentId,
path: $build->getAttribute('outputPath', ''),
vars: $vars,
entrypoint: $deployment->getAttribute('entrypoint', ''),
data: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
runtime: $function->getAttribute('runtime', ''),
timeout: $function->getAttribute('timeout', 0),
baseImage: $runtime['image']
);
/** Update execution status */
$execution
->setAttribute('status', $executionResponse['status'])
->setAttribute('statusCode', $executionResponse['statusCode'])
->setAttribute('response', $executionResponse['response'])
->setAttribute('stdout', $executionResponse['stdout'])
->setAttribute('stderr', $executionResponse['stderr'])
->setAttribute('duration', $executionResponse['duration']);
} catch (\Throwable $th) {
$interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt()));
$execution
->setAttribute('duration', (float)$interval->format('%s.%f'))
->setAttribute('status', 'failed')
->setAttribute('statusCode', $th->getCode())
->setAttribute('stderr', $th->getMessage());
Console::error($th->getMessage());
}
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration'))
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
}
}
public function shutdown(): void
{
}
}
$server->workerStart();
$server->start();

View file

@ -61,7 +61,8 @@
"utopia-php/websocket": "0.1.0",
"utopia-php/image": "0.5.*",
"utopia-php/orchestration": "0.6.*",
"utopia-php/pools": "0.1.*",
"utopia-php/queue": "0.4.0",
"utopia-php/pools": "dev-upgrade-cli as 0.2.0",
"resque/php-resque": "1.3.6",
"matomo/device-detector": "6.0.0",
"dragonmantank/cron-expression": "3.3.1",

213
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "f3beee3a829a19e53b311052111bde2c",
"content-hash": "a744959294e219fff6ea9c17f9fb0705",
"packages": [
{
"name": "adhocore/jwt",
@ -115,15 +115,15 @@
},
{
"name": "appwrite/php-runtimes",
"version": "0.11.0",
"version": "0.11.1",
"source": {
"type": "git",
"url": "https://github.com/appwrite/runtimes.git",
"reference": "547fc026e11c0946846a8ac690898f5bf53be101"
"reference": "9d74a477ba3333cbcfac565c46fcf19606b7b603"
},
"require": {
"php": ">=8.0",
"utopia-php/system": "0.4.*"
"utopia-php/system": "0.6.*"
},
"require-dev": {
"phpunit/phpunit": "^9.3",
@ -154,7 +154,7 @@
"php",
"runtimes"
],
"time": "2022-08-15T14:03:36+00:00"
"time": "2022-11-07T16:45:52+00:00"
},
{
"name": "chillerlan/php-qrcode",
@ -300,16 +300,16 @@
},
{
"name": "colinmollenhour/credis",
"version": "v1.13.1",
"version": "v1.14.0",
"source": {
"type": "git",
"url": "https://github.com/colinmollenhour/credis.git",
"reference": "85df015088e00daf8ce395189de22c8eb45c8d49"
"reference": "dccc8a46586475075fbb012d8bd523b8a938c2dc"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/colinmollenhour/credis/zipball/85df015088e00daf8ce395189de22c8eb45c8d49",
"reference": "85df015088e00daf8ce395189de22c8eb45c8d49",
"url": "https://api.github.com/repos/colinmollenhour/credis/zipball/dccc8a46586475075fbb012d8bd523b8a938c2dc",
"reference": "dccc8a46586475075fbb012d8bd523b8a938c2dc",
"shasum": ""
},
"require": {
@ -341,9 +341,9 @@
"homepage": "https://github.com/colinmollenhour/credis",
"support": {
"issues": "https://github.com/colinmollenhour/credis/issues",
"source": "https://github.com/colinmollenhour/credis/tree/v1.13.1"
"source": "https://github.com/colinmollenhour/credis/tree/v1.14.0"
},
"time": "2022-06-20T22:56:59+00:00"
"time": "2022-11-09T01:18:39+00:00"
},
{
"name": "composer/package-versions-deprecated",
@ -693,16 +693,16 @@
},
{
"name": "guzzlehttp/psr7",
"version": "2.4.1",
"version": "2.4.3",
"source": {
"type": "git",
"url": "https://github.com/guzzle/psr7.git",
"reference": "69568e4293f4fa993f3b0e51c9723e1e17c41379"
"reference": "67c26b443f348a51926030c83481b85718457d3d"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/guzzle/psr7/zipball/69568e4293f4fa993f3b0e51c9723e1e17c41379",
"reference": "69568e4293f4fa993f3b0e51c9723e1e17c41379",
"url": "https://api.github.com/repos/guzzle/psr7/zipball/67c26b443f348a51926030c83481b85718457d3d",
"reference": "67c26b443f348a51926030c83481b85718457d3d",
"shasum": ""
},
"require": {
@ -792,7 +792,7 @@
],
"support": {
"issues": "https://github.com/guzzle/psr7/issues",
"source": "https://github.com/guzzle/psr7/tree/2.4.1"
"source": "https://github.com/guzzle/psr7/tree/2.4.3"
},
"funding": [
{
@ -808,7 +808,7 @@
"type": "tidelift"
}
],
"time": "2022-08-28T14:45:39+00:00"
"time": "2022-10-26T14:07:24+00:00"
},
{
"name": "influxdb/influxdb-php",
@ -931,6 +931,72 @@
},
"time": "2021-02-04T16:20:16+00:00"
},
{
"name": "laravel/pint",
"version": "v1.2.0",
"source": {
"type": "git",
"url": "https://github.com/laravel/pint.git",
"reference": "1d276e4c803397a26cc337df908f55c2a4e90d86"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/laravel/pint/zipball/1d276e4c803397a26cc337df908f55c2a4e90d86",
"reference": "1d276e4c803397a26cc337df908f55c2a4e90d86",
"shasum": ""
},
"require": {
"ext-json": "*",
"ext-mbstring": "*",
"ext-tokenizer": "*",
"ext-xml": "*",
"php": "^8.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.11.0",
"illuminate/view": "^9.27",
"laravel-zero/framework": "^9.1.3",
"mockery/mockery": "^1.5.0",
"nunomaduro/larastan": "^2.2",
"nunomaduro/termwind": "^1.14.0",
"pestphp/pest": "^1.22.1"
},
"bin": [
"builds/pint"
],
"type": "project",
"autoload": {
"psr-4": {
"App\\": "app/",
"Database\\Seeders\\": "database/seeders/",
"Database\\Factories\\": "database/factories/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Nuno Maduro",
"email": "enunomaduro@gmail.com"
}
],
"description": "An opinionated code formatter for PHP.",
"homepage": "https://laravel.com",
"keywords": [
"format",
"formatter",
"lint",
"linter",
"php"
],
"support": {
"issues": "https://github.com/laravel/pint/issues",
"source": "https://github.com/laravel/pint"
},
"time": "2022-09-13T15:07:15+00:00"
},
{
"name": "matomo/device-detector",
"version": "6.0.0",
@ -2431,23 +2497,24 @@
},
{
"name": "utopia-php/pools",
"version": "0.1.0",
"version": "dev-upgrade-cli",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/pools.git",
"reference": "5a467a569a80aefc846a97dc195b4adc2fd71805"
"reference": "88a2c1ed2badbfdf2787ce0a12def2c988fc1097"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/pools/zipball/5a467a569a80aefc846a97dc195b4adc2fd71805",
"reference": "5a467a569a80aefc846a97dc195b4adc2fd71805",
"url": "https://api.github.com/repos/utopia-php/pools/zipball/88a2c1ed2badbfdf2787ce0a12def2c988fc1097",
"reference": "88a2c1ed2badbfdf2787ce0a12def2c988fc1097",
"shasum": ""
},
"require": {
"ext-mongodb": "*",
"ext-pdo": "*",
"ext-redis": "*",
"php": ">=8.0"
"php": ">=8.0",
"utopia-php/cli": "0.13.*"
},
"require-dev": {
"phpunit/phpunit": "^9.4",
@ -2478,9 +2545,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/pools/issues",
"source": "https://github.com/utopia-php/pools/tree/0.1.0"
"source": "https://github.com/utopia-php/pools/tree/upgrade-cli"
},
"time": "2022-10-11T19:31:07+00:00"
"time": "2022-11-04T08:33:04+00:00"
},
{
"name": "utopia-php/preloader",
@ -2535,6 +2602,67 @@
},
"time": "2020-10-24T07:04:59+00:00"
},
{
"name": "utopia-php/queue",
"version": "0.4.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "0cad4cf4231377aa6c67956b51ba1954e0d02166"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/0cad4cf4231377aa6c67956b51ba1954e0d02166",
"reference": "0cad4cf4231377aa6c67956b51ba1954e0d02166",
"shasum": ""
},
"require": {
"php": ">=8.0",
"utopia-php/cli": "0.13.*",
"utopia-php/framework": "0.*.*"
},
"require-dev": {
"laravel/pint": "^0.2.3",
"phpstan/phpstan": "^1.8",
"phpunit/phpunit": "^9.5.5",
"swoole/ide-helper": "4.8.8",
"workerman/workerman": "^4.0"
},
"suggest": {
"ext-swoole": "Needed to support Swoole.",
"workerman/workerman": "Needed to support Workerman."
},
"type": "library",
"autoload": {
"psr-4": {
"Utopia\\Queue\\": "src/Queue"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Torsten Dittmann",
"email": "torsten@appwrite.io"
}
],
"description": "A powerful task queue.",
"keywords": [
"Tasks",
"framework",
"php",
"queue",
"upf",
"utopia"
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/0.4.0"
},
"time": "2022-10-31T06:23:08+00:00"
},
{
"name": "utopia-php/registry",
"version": "0.5.0",
@ -2700,23 +2828,25 @@
},
{
"name": "utopia-php/system",
"version": "0.4.0",
"version": "0.6.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/system.git",
"reference": "67c92c66ce8f0cc925a00bca89f7a188bf9183c0"
"reference": "289c4327713deadc9c748b5317d248133a02f245"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/system/zipball/67c92c66ce8f0cc925a00bca89f7a188bf9183c0",
"reference": "67c92c66ce8f0cc925a00bca89f7a188bf9183c0",
"url": "https://api.github.com/repos/utopia-php/system/zipball/289c4327713deadc9c748b5317d248133a02f245",
"reference": "289c4327713deadc9c748b5317d248133a02f245",
"shasum": ""
},
"require": {
"laravel/pint": "1.2.*",
"php": ">=7.4"
},
"require-dev": {
"phpunit/phpunit": "^9.3",
"squizlabs/php_codesniffer": "^3.6",
"vimeo/psalm": "4.0.1"
},
"type": "library",
@ -2749,9 +2879,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/system/issues",
"source": "https://github.com/utopia-php/system/tree/0.4.0"
"source": "https://github.com/utopia-php/system/tree/0.6.0"
},
"time": "2021-02-04T14:14:49+00:00"
"time": "2022-11-07T13:51:59+00:00"
},
{
"name": "utopia-php/websocket",
@ -3574,16 +3704,16 @@
},
{
"name": "phpunit/php-code-coverage",
"version": "9.2.17",
"version": "9.2.18",
"source": {
"type": "git",
"url": "https://github.com/sebastianbergmann/php-code-coverage.git",
"reference": "aa94dc41e8661fe90c7316849907cba3007b10d8"
"reference": "12fddc491826940cf9b7e88ad9664cf51f0f6d0a"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/aa94dc41e8661fe90c7316849907cba3007b10d8",
"reference": "aa94dc41e8661fe90c7316849907cba3007b10d8",
"url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/12fddc491826940cf9b7e88ad9664cf51f0f6d0a",
"reference": "12fddc491826940cf9b7e88ad9664cf51f0f6d0a",
"shasum": ""
},
"require": {
@ -3639,7 +3769,7 @@
],
"support": {
"issues": "https://github.com/sebastianbergmann/php-code-coverage/issues",
"source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.17"
"source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.18"
},
"funding": [
{
@ -3647,7 +3777,7 @@
"type": "github"
}
],
"time": "2022-08-30T12:24:04+00:00"
"time": "2022-10-27T13:35:33+00:00"
},
{
"name": "phpunit/php-file-iterator",
@ -5402,11 +5532,18 @@
"version": "dev-feat-update-cache-lib",
"alias": "0.26.1",
"alias_normalized": "0.26.1.0"
},
{
"package": "utopia-php/pools",
"version": "dev-upgrade-cli",
"alias": "0.2.0",
"alias_normalized": "0.2.0.0"
}
],
"minimum-stability": "stable",
"stability-flags": {
"utopia-php/database": 20
"utopia-php/database": 20,
"utopia-php/pools": 20
},
"prefer-stable": false,
"prefer-lowest": false,
@ -5431,5 +5568,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.3.0"
"plugin-api-version": "2.2.0"
}

View file

@ -6,6 +6,7 @@ use Appwrite\Utopia\Response;
use Appwrite\Utopia\Response\Model;
use Utopia\Database\Role;
class Execution extends Model
{
public function __construct()

View file

@ -81,18 +81,6 @@ class Func extends Model
'default' => '',
'example' => '5 4 * * *',
])
->addRule('scheduleNext', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s next scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('schedulePrevious', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s previous scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('timeout', [
'type' => self::TYPE_INTEGER,
'description' => 'Function execution timeout in seconds.',