1
0
Fork 0
mirror of synced 2024-06-15 09:14:50 +12:00

feat: fix executor issues

This commit is contained in:
Christy Jacob 2022-02-05 23:49:57 +04:00
parent f83fdc92de
commit 1b0a02e20d
4 changed files with 219 additions and 165 deletions

View file

@ -332,7 +332,7 @@ App::put('/v1/functions/:functionId')
])));
if ($next && $schedule !== $original) {
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
ResqueScheduler::enqueueAt($next, Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [
'projectId' => $project->getId(),
'webhooks' => $project->getAttribute('webhooks', []),
'functionId' => $function->getId(),
@ -761,13 +761,14 @@ App::post('/v1/functions/:functionId/executions')
}
$runtimes = Config::getParam('runtimes', []);
$key = $function->getAttribute('runtime', '');
$runtime = isset($runtimes[$key]) ? $runtimes[$key] : null;
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null;
if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
$deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $function->getAttribute('deployment')));
$deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $function->getAttribute('deployment', '')));
if ($deployment->getAttribute('resourceId') !== $function->getId()) {
throw new Exception('Deployment not found. Deploy deployment before trying to execute a function', 404);
@ -784,7 +785,7 @@ App::post('/v1/functions/:functionId/executions')
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not completed', 400);
throw new Exception('Build not ready', 400);
}
$validator = new Authorization('execute');
@ -798,7 +799,7 @@ App::post('/v1/functions/:functionId/executions')
$execution = Authorization::skip(fn() => $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$read' => (!$user->isEmpty()) ? ['user:' . $user->getId()] : [],
'$write' => ['role:all'],
'$write' => [],
'dateCreated' => time(),
'functionId' => $function->getId(),
'deploymentId' => $deployment->getId(),
@ -833,32 +834,23 @@ App::post('/v1/functions/:functionId/executions')
}
if ($async) {
Resque::enqueue('v1-functions', 'FunctionsV1', [
Resque::enqueue(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [
'projectId' => $project->getId(),
'deploymentId' => $deployment->getId(),
'buildId' => $deployment->getAttribute('buildId', ''),
'path' => $build->getAttribute('outputPath', ''),
'vars' => $function->getAttribute('vars', []),
'data' => $data,
'runtime' => $function->getAttribute('runtime', ''),
'timeout' => $function->getAttribute('timeout', 0),
'baseImage' => $runtime['image'],
'webhooks' => $project->getAttribute('webhooks', []),
'userId' => $user->getId(),
'functionId' => $function->getId(),
'webhooks' => $project->getAttribute('webhooks', []),
'executionId' => $execution->getId(),
'trigger' => 'http',
'data' => $data,
'userId' => $user->getId(),
'jwt' => $jwt,
]);
$response->setStatusCode(Response::STATUS_CODE_CREATED);
$response->dynamic($execution, Response::MODEL_EXECUTION);
return $response;
}
/** Send variables */
$vars = \array_merge($function->getAttribute('vars', []), [
'ENTRYPOINT_NAME' => $deployment->getAttribute('entrypoint', ''),
'APPWRITE_FUNCTION_ID' => $function->getId(),
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deployment->getId(),
@ -871,9 +863,8 @@ App::post('/v1/functions/:functionId/executions')
'APPWRITE_FUNCTION_JWT' => $jwt,
]);
// Directly execute function.
/** Execute function */
$executor = new Executor();
$responseExecute = $executor->createExecution(
projectId: $project->getId(),
functionId: $function->getId(),
@ -882,6 +873,7 @@ App::post('/v1/functions/:functionId/executions')
path: $build->getAttribute('outputPath', ''),
vars: $vars,
data: $data,
entrypoint: $deployment->getAttribute('entrypoint', ''),
runtime: $function->getAttribute('runtime', ''),
timeout: $function->getAttribute('timeout', 0),
baseImage: $runtime['image'],
@ -889,6 +881,7 @@ App::post('/v1/functions/:functionId/executions')
userId: $user->getId(),
);
/** Update execution status */
$execution->setAttribute('status', $responseExecute['status']);
$execution->setAttribute('statusCode', $responseExecute['statusCode']);
$execution->setAttribute('stdout', $responseExecute['stdout']);

View file

@ -1,11 +1,7 @@
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Stats\Stats;
use Appwrite\Utopia\Response;
use Appwrite\Utopia\Response\Model\Execution;
use Swoole\ConnectionPool;
use Swoole\Coroutine as Co;
use Swoole\Http\Request as SwooleRequest;
@ -260,7 +256,7 @@ function createRuntimeServer(string $projectId, string $deploymentId, array $bui
}
};
function execute(string $projectId, string $functionId, string $deploymentId, array $build, array $vars, string $data, string $userId, string $baseImage, string $runtime, int $timeout, array $webhooks = []): array
function execute(string $projectId, string $functionId, string $deploymentId, array $build, array $vars, string $data, string $userId, string $baseImage, string $runtime, string $entrypoint, int $timeout, array $webhooks = []): array
{
Console::info('Executing function: ' . $functionId);
@ -296,7 +292,7 @@ function execute(string $projectId, string $functionId, string $deploymentId, ar
$body = \json_encode([
'path' => '/usr/code',
'file' => $vars['ENTRYPOINT_NAME'],
'file' => $entrypoint,
'env' => $vars,
'payload' => $data,
'timeout' => $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)
@ -348,8 +344,6 @@ function execute(string $projectId, string $functionId, string $deploymentId, ar
throw new Exception('An internal curl error has occurred within the executor! Error Msg: ' . $error, 500);
}
var_dump($executorResponse);
$executionData = [];
if (!empty($executorResponse)) {
@ -390,44 +384,7 @@ function execute(string $projectId, string $functionId, string $deploymentId, ar
'time' => $executionTime,
];
/** Trigger event */
$executionModel = new Execution();
$executionUpdate = new Event('v1-webhooks', 'WebhooksV1');
$executionUpdate
->setParam('projectId', $projectId)
->setParam('userId', $userId)
->setParam('webhooks', $webhooks)
->setParam('event', 'functions.executions.update')
->setParam('eventData', (new Document($execution))->getArrayCopy(array_keys($executionModel->getRules())));
$executionUpdate->trigger();
/** Trigger realtime event */
$target = Realtime::fromPayload('functions.executions.update', new Document($execution));
Realtime::send(
projectId: $projectId,
payload: $execution,
event: 'functions.executions.update',
channels: $target['channels'],
roles: $target['roles']
);
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $projectId)
->setParam('functionId', $functionId)
->setParam('functionExecution', 1)
->setParam('functionStatus', $functionStatus)
->setParam('functionExecutionTime', $executionTime * 1000) // ms
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
$usage->submit();
}
return $execution;
};
function runBuildStage(string $buildId, string $projectID, string $path, array $vars, string $baseImage, string $runtime): array
@ -651,6 +608,7 @@ App::post('/v1/functions/:functionId/executions')
->param('vars', '', new Assoc(), 'Environment Variables required for the build', false)
->param('data', '', new Text(8192), 'Data to be forwarded to the function, this is user specified.', true)
->param('runtime', '', new Text(128), 'Runtime for the cloud function', false)
->param('entrypoint', '', new Text(256), 'Entrypoint of the code file')
->param('timeout', 15, new ValidatorRange(1, 900), 'Function maximum execution time in seconds.', true)
->param('baseImage', '', new Text(128), 'Base image name of the runtime', false)
->param('webhooks', [], new ArrayList(new JSON()), 'Any webhooks that need to be triggered after this execution', true)
@ -658,7 +616,7 @@ App::post('/v1/functions/:functionId/executions')
->inject('projectId')
->inject('response')
->action(
function (string $functionId, string $deploymentId, string $buildId, string $path, array $vars, string $data, string $runtime, $timeout, string $baseImage, array $webhooks, string $userId, string $projectId, Response $response) {
function (string $functionId, string $deploymentId, string $buildId, string $path, array $vars, string $data, string $runtime, string $entrypoint, $timeout, string $baseImage, array $webhooks, string $userId, string $projectId, Response $response) {
$build = [
'$id' => $buildId,
@ -666,7 +624,7 @@ App::post('/v1/functions/:functionId/executions')
];
// Send both data and vars from the caller
$execution = execute($projectId, $functionId, $deploymentId, $build, $vars, $data, $userId, $baseImage, $runtime, $timeout, $webhooks);
$execution = execute($projectId, $functionId, $deploymentId, $build, $vars, $data, $userId, $baseImage, $runtime, $entrypoint, $timeout, $webhooks);
$response
->setStatusCode(Response::STATUS_CODE_OK)

View file

@ -1,7 +1,12 @@
<?php
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Stats\Stats;
use Appwrite\Utopia\Response\Model\Execution;
use Cron\CronExpression;
use Executor\Executor;
use Swoole\Runtime;
use Utopia\App;
use Utopia\CLI\Console;
@ -33,6 +38,11 @@ Console::success('Finished warmup in ' . $warmupTime . ' seconds');
class FunctionsV1 extends Worker
{
/**
* @var Executor
*/
private $executor = null;
public array $args = [];
public array $allowed = [];
@ -43,6 +53,7 @@ class FunctionsV1 extends Worker
public function init(): void
{
$this->executor = new Executor();
}
public function run(): void
@ -70,13 +81,7 @@ class FunctionsV1 extends Worker
/** @var Document[] $functions */
while ($sum >= $limit) {
Authorization::disable();
$functions = $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]);
Authorization::reset();
$functions = Authorization::skip(fn() => $database->find('functions', [], $limit, $offset, ['name'], [Database::ORDER_ASC]));
$sum = \count($functions);
$offset = $offset + $limit;
@ -84,31 +89,31 @@ class FunctionsV1 extends Worker
foreach ($functions as $function) {
$events = $function->getAttribute('events', []);
$deployment = $function->getAttribute('deployment', []);
Console::success('Itterating function: ' . $function->getAttribute('name'));
if (!\in_array($event, $events) || empty($deployment)) {
if (!\in_array($event, $events)) {
continue;
}
Console::success('Triggered function: ' . $event);
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
trigger: 'event',
projectId: $projectId,
executionId: '',
database: $database,
function: $function,
dbForProject: $database,
executionId: $executionId,
webhooks: $webhooks,
trigger: $trigger,
event: $event,
eventData: $eventData,
data: $data,
webhooks: $webhooks,
userId: $userId,
jwt: $jwt
);
Console::success('Triggered function: ' . $event);
}
}
break;
case 'schedule':
@ -126,9 +131,7 @@ class FunctionsV1 extends Worker
*/
// Reschedule
Authorization::disable();
$function = $database->getDocument('functions', $functionId);
Authorization::reset();
$function = Authorization::skip(fn() => $database->getDocument('functions', $functionId));
if (empty($function->getId())) {
throw new Exception('Function not found ('.$functionId.')');
@ -145,19 +148,18 @@ class FunctionsV1 extends Worker
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', \time());
Authorization::disable();
$function = Authorization::skip(function() use ($database, $function, $next, $functionId) {
$function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
'scheduleNext' => (int)$next,
])));
if ($function === false) {
throw new Exception('Function update failed (' . $functionId . ')');
}
return $function;
});
$function = $database->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
'scheduleNext' => (int)$next,
])));
if ($function === false) {
throw new Exception('Function update failed (' . $functionId . ')');
}
Authorization::reset();
ResqueScheduler::enqueueAt($next, 'v1-functions', 'FunctionsV1', [
ResqueScheduler::enqueueAt($next, Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME, [
'projectId' => $projectId,
'webhooks' => $webhooks,
'functionId' => $function->getId(),
@ -168,101 +170,200 @@ class FunctionsV1 extends Worker
]); // Async task reschedule
$this->execute(
trigger: $trigger,
projectId: $projectId,
executionId: $executionId,
database: $database,
function: $function,
data: $data,
dbForProject: $database,
executionId: $executionId,
webhooks: $webhooks,
trigger: $trigger,
event: $event,
eventData: $eventData,
data: $data,
userId: $userId,
jwt: $jwt
);
break;
case 'http':
Authorization::disable();
$function = $database->getDocument('functions', $functionId);
Authorization::reset();
$function = Authorization::skip(fn() => $database->getDocument('functions', $functionId));
if (empty($function->getId())) {
throw new Exception('Function not found ('.$functionId.')');
}
$deployment = Authorization::skip(fn() => $database->getDocument('deployments', $function->getAttribute('deployment')));
$build = Authorization::skip(fn() => $database->getDocument('builds', $deployment->getAttribute('build')));
$path = $build->getAttribute('path', '');
$this->execute(
trigger: $trigger,
projectId: $projectId,
executionId: $executionId,
path: $path,
buildId: $deployment->getAttribute('buildId', ''),
deploymentId: $deployment->getId(),
database: $database,
function: $function,
data: $data,
dbForProject: $database,
executionId: $executionId,
webhooks: $webhooks,
trigger: $trigger,
event: $event,
eventData: $eventData,
data: $data,
userId: $userId,
jwt: $jwt
);
break;
}
}
/**
* Execute function deployment
*
* @param string $trigger
* @param string $projectId
* @param string $executionId
* @param Database $database
* @param Document $function
* @param string $event
* @param string $eventData
* @param string $data
* @param array $webhooks
* @param string $userId
* @param string $jwt
*
* @return void
*/
public function execute(string $trigger, string $path, string $projectId, string $deploymentId, string $buildId, string $executionId, Database $database, Document $function, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = ''): void
{
$ch = \curl_init();
\curl_setopt($ch, CURLOPT_URL, "http://appwrite-executor/v1/functions/{$function->getId()}/executions");
\curl_setopt($ch, CURLOPT_POST, true);
\curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode([
'deploymentId' => $deploymentId,
'buildId' => $buildId,
'path' => $path,
'vars' => $function->getAttribute('vars', []),
'data' => $data,
'runtime' => $function->getAttribute('runtime', ''),
'timeout' => $function->getAttribute('timeout', 0),
'baseImage' => '',
'webhooks' => $webhooks,
'userId' => $userId,
]));
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
\curl_setopt($ch, CURLOPT_TIMEOUT, App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900) + 200); // + 200 for safety margin
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
\curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
'x-appwrite-project: '.$projectId,
'x-appwrite-executor-key: '. App::getEnv('_APP_EXECUTOR_SECRET', '')
]);
private function execute(
string $projectId,
Document $function,
Database $dbForProject,
string $executionId,
array $webhooks,
string $trigger,
string $event,
string $eventData,
string $data,
string $userId,
string $jwt
) {
\curl_exec($ch);
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
$error = \curl_error($ch);
if (!empty($error)) {
Console::error('Curl error: '.$error);
/** Check if deployment exists */
$deployment = Authorization::skip(fn() => $dbForProject->getDocument('deployments', $deploymentId));
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
\curl_close($ch);
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
}
/** Check if build has exists */
$build = Authorization::skip(fn() => $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) ? $runtimes[$function->getAttribute('runtime', '')] : null;
if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
}
/** Create execution or update execution status */
$execution = Authorization::skip(function() use ($dbForProject, &$executionId, $functionId, $deploymentId, $trigger, $userId) {
$execution = $dbForProject->getDocument('executions', $executionId);
if ($execution->isEmpty()) {
$executionId = $dbForProject->getId();
$execution = $dbForProject->createDocument('executions', new Document([
'$id' => $executionId,
'$read' => $userId ? ['user:' . $userId] : [],
'$write' => [],
'dateCreated' => time(),
'functionId' => $functionId,
'deploymentId' => $deploymentId,
'trigger' => $trigger,
'status' => 'waiting',
'statusCode' => 0,
'stdout' => '',
'stderr' => '',
'time' => 0.0,
'search' => implode(' ', [$functionId, $executionId]),
]));
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
return $execution;
});
/** Collect environment variables */
$vars = [
'APPWRITE_FUNCTION_ID' => $functionId,
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId,
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_TRIGGER' => $trigger,
'APPWRITE_FUNCTION_EVENT' => $event,
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData,
'APPWRITE_FUNCTION_DATA' => $data,
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
'APPWRITE_FUNCTION_USER_ID' => $userId,
'APPWRITE_FUNCTION_JWT' => $jwt,
];
$vars = \array_merge($function->getAttribute('vars', []), $vars);
/** Execute function */
$executionResponse = $this->executor->createExecution(
projectId: $projectId,
functionId: $functionId,
deploymentId: $deploymentId,
buildId: $deployment->getAttribute('buildId', ''),
path: $build->getAttribute('outputPath', ''),
vars: $vars,
entrypoint: $deployment->getAttribute('entrypoint', ''),
data: $vars['APPWRITE_FUNCTION_DATA'],
runtime: $function->getAttribute('runtime', ''),
timeout: $function->getAttribute('timeout', 0),
baseImage: $runtime['image'],
webhooks: $webhooks,
userId: $userId,
);
/** Update execution status */
$execution->setAttribute('status', $executionResponse['status']);
$execution->setAttribute('statusCode', $executionResponse['statusCode']);
$execution->setAttribute('stdout', $executionResponse['stdout']);
$execution->setAttribute('stderr', $executionResponse['stderr']);
$execution->setAttribute('time', $executionResponse['time']);
$execution = Authorization::skip(fn() => $dbForProject->updateDocument('executions', $executionId, $execution));
/** Trigger Webhook */
$executionModel = new Execution();
$executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME);
$executionUpdate
->setParam('projectId', $projectId)
->setParam('userId', $userId)
->setParam('webhooks', $webhooks)
->setParam('event', 'functions.executions.update')
->setParam('eventData', $execution->getArrayCopy(array_keys($executionModel->getRules())));
$executionUpdate->trigger();
/** Trigger realtime event */
$target = Realtime::fromPayload('functions.executions.update', $execution);
Realtime::send(
projectId: $projectId,
payload: $execution->getArrayCopy(),
event: 'functions.executions.update',
channels: $target['channels'],
roles: $target['roles']
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $projectId)
->setParam('functionId', $function->getId())
->setParam('functionExecution', 1)
->setParam('functionStatus', $execution->getAttribute('status', ''))
->setParam('functionExecutionTime', $execution->getAttribute('time') * 1000) // ms
->setParam('networkRequestSize', 0)
->setParam('networkResponseSize', 0)
->submit();
$usage->submit();
}
}
public function shutdown(): void

View file

@ -92,6 +92,7 @@ class Executor
string $buildId,
string $path,
array $vars,
string $entrypoint,
string $data,
string $runtime,
string $baseImage,
@ -117,6 +118,7 @@ class Executor
'baseImage' => $baseImage,
'webhooks' => $webhooks,
'userId' => $userId,
'entrypoint' => $entrypoint
];
$response = $this->call(self::METHOD_POST, $route, $headers, $params, true, 30);