feat: add start up logic
This commit is contained in:
parent
b42da9e996
commit
66b47973bc
3 changed files with 173 additions and 156 deletions
325
app/executor.php
325
app/executor.php
|
@ -1,6 +1,7 @@
|
|||
<?php
|
||||
require_once __DIR__ . '/../vendor/autoload.php';
|
||||
|
||||
use Appwrite\Runtimes\Runtimes;
|
||||
use Swoole\ConnectionPool;
|
||||
use Swoole\Coroutine as Co;
|
||||
use Swoole\Http\Request as SwooleRequest;
|
||||
|
@ -22,6 +23,18 @@ use Utopia\Validator\Assoc;
|
|||
use Utopia\Validator\Range as ValidatorRange;
|
||||
use Utopia\Validator\Text;
|
||||
|
||||
|
||||
// EXECUTOR_ pattern
|
||||
// TODO
|
||||
// Discuss and fix startup logic
|
||||
// - Pull runtmies
|
||||
// - Remove orphans
|
||||
// Maintenance job
|
||||
// - delete pending runtimes older than X minutes ? ENV_VARS
|
||||
// Implement other endpoints
|
||||
// Get list of supported runtimes on startup - Done
|
||||
//
|
||||
|
||||
Swoole\Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
|
||||
|
||||
function logError(Throwable $error, string $action, Utopia\Route $route = null)
|
||||
|
@ -71,12 +84,14 @@ $orchestrationPool = new ConnectionPool(function () {
|
|||
$dockerUser = App::getEnv('DOCKERHUB_PULL_USERNAME', null);
|
||||
$dockerPass = App::getEnv('DOCKERHUB_PULL_PASSWORD', null);
|
||||
$orchestration = new Orchestration(new DockerCLI($dockerUser, $dockerPass));
|
||||
|
||||
return $orchestration;
|
||||
}, 6);
|
||||
|
||||
try {
|
||||
$runtimes = Config::getParam('runtimes');
|
||||
|
||||
$runtimes = new Runtimes();
|
||||
$allowList = empty(App::getEnv('_APP_FUNCTIONS_RUNTIMES')) ? [] : \explode(',', App::getEnv('_APP_FUNCTIONS_RUNTIMES'));
|
||||
$runtimes = $runtimes->getAll(true, $allowList);
|
||||
|
||||
// Warmup: make sure images are ready to run fast 🚀
|
||||
Co\run(function () use ($runtimes, $orchestrationPool) {
|
||||
|
@ -104,6 +119,7 @@ try {
|
|||
|
||||
$activeFunctions = new Swoole\Table(1024);
|
||||
$activeFunctions->column('id', Swoole\Table::TYPE_STRING, 512);
|
||||
$activeFunctions->column('created', Swoole\Table::TYPE_INT, 512);
|
||||
$activeFunctions->column('name', Swoole\Table::TYPE_STRING, 512);
|
||||
$activeFunctions->column('status', Swoole\Table::TYPE_STRING, 512);
|
||||
$activeFunctions->column('key', Swoole\Table::TYPE_STRING, 4096);
|
||||
|
@ -136,137 +152,6 @@ try {
|
|||
call_user_func($logError, $error, "startupError");
|
||||
}
|
||||
|
||||
function execute(string $runtimeId, string $path, array $vars, string $data, string $baseImage, string $runtime, string $entrypoint, int $timeout): array
|
||||
{
|
||||
|
||||
Console::info('Executing Runtime: ' . $runtimeId);
|
||||
|
||||
global $activeFunctions;
|
||||
$container = 'runtime-' . $runtimeId;
|
||||
|
||||
/** Create a new runtime server if there's none running */
|
||||
// if (!$activeFunctions->exists($container)) {
|
||||
// Console::info("Runtime server for $runtimeId not running. Creating new one...");
|
||||
// createRuntimeServer($runtimeId, $path, $vars, $baseImage, $runtime);
|
||||
// }
|
||||
|
||||
$key = $activeFunctions->get('runtime-' . $runtimeId, 'key');
|
||||
|
||||
$stdout = '';
|
||||
$stderr = '';
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
|
||||
$statusCode = 0;
|
||||
|
||||
$errNo = -1;
|
||||
$attempts = 0;
|
||||
$max = 5;
|
||||
|
||||
$executorResponse = '';
|
||||
|
||||
// cURL request to runtime
|
||||
do {
|
||||
$attempts++;
|
||||
$ch = \curl_init();
|
||||
|
||||
$body = \json_encode([
|
||||
'path' => '/usr/code',
|
||||
'file' => $entrypoint,
|
||||
'env' => $vars,
|
||||
'payload' => $data,
|
||||
'timeout' => $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)
|
||||
]);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_URL, "http://" . $container . ":3000/");
|
||||
\curl_setopt($ch, CURLOPT_POST, true);
|
||||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
||||
\curl_setopt($ch, CURLOPT_TIMEOUT, $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900));
|
||||
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_HTTPHEADER, [
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: ' . \strlen($body),
|
||||
'x-internal-challenge: ' . $key,
|
||||
'host: null'
|
||||
]);
|
||||
|
||||
$executorResponse = \curl_exec($ch);
|
||||
|
||||
$statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
||||
|
||||
$error = \curl_error($ch);
|
||||
|
||||
$errNo = \curl_errno($ch);
|
||||
|
||||
\curl_close($ch);
|
||||
if ($errNo != CURLE_COULDNT_CONNECT && $errNo != 111) {
|
||||
break;
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
} while ($attempts < $max);
|
||||
|
||||
if ($attempts >= 5) {
|
||||
$stderr = 'Failed to connect to executor runtime after 5 attempts.';
|
||||
$statusCode = 124;
|
||||
}
|
||||
|
||||
// If timeout error
|
||||
if (in_array($errNo, [CURLE_OPERATION_TIMEDOUT, 110])) {
|
||||
$statusCode = 124;
|
||||
}
|
||||
|
||||
// 110 is the Swoole error code for timeout, see: https://www.swoole.co.uk/docs/swoole-error-code
|
||||
if ($errNo !== 0 && $errNo !== CURLE_COULDNT_CONNECT && $errNo !== CURLE_OPERATION_TIMEDOUT && $errNo !== 110) {
|
||||
throw new Exception('An internal curl error has occurred within the executor! Error Msg: ' . $error, 500);
|
||||
}
|
||||
|
||||
$executionData = [];
|
||||
|
||||
if (!empty($executorResponse)) {
|
||||
$executionData = json_decode($executorResponse, true);
|
||||
}
|
||||
|
||||
if (isset($executionData['code'])) {
|
||||
$statusCode = $executionData['code'];
|
||||
}
|
||||
|
||||
if ($statusCode === 500) {
|
||||
if (isset($executionData['message'])) {
|
||||
$stderr = $executionData['message'];
|
||||
} else {
|
||||
$stderr = 'Internal Runtime error';
|
||||
}
|
||||
} else if ($statusCode === 124) {
|
||||
$stderr = 'Execution timed out.';
|
||||
} else if ($statusCode === 0) {
|
||||
$stderr = 'Execution failed.';
|
||||
} else if ($statusCode >= 200 && $statusCode < 300) {
|
||||
$stdout = $executorResponse;
|
||||
} else {
|
||||
$stderr = 'Execution failed.';
|
||||
}
|
||||
|
||||
$executionEnd = \microtime(true);
|
||||
$executionTime = ($executionEnd - $executionStart);
|
||||
$functionStatus = ($statusCode >= 200 && $statusCode < 300) ? 'completed' : 'failed';
|
||||
|
||||
Console::success('Function executed in ' . $executionTime . ' seconds, status: ' . $functionStatus);
|
||||
|
||||
$execution = [
|
||||
'status' => $functionStatus,
|
||||
'statusCode' => $statusCode,
|
||||
'stdout' => \utf8_encode(\mb_substr($stdout, -8000)),
|
||||
'stderr' => \utf8_encode(\mb_substr($stderr, -8000)),
|
||||
'time' => $executionTime,
|
||||
];
|
||||
|
||||
return $execution;
|
||||
};
|
||||
|
||||
|
||||
// POST /v1/runtimes
|
||||
App::post('/v1/runtimes')
|
||||
|
@ -275,6 +160,7 @@ App::post('/v1/runtimes')
|
|||
->param('source', '', new Text(0), 'Path to source files.')
|
||||
->param('destination', '', new Text(0), 'Destination folder to store build files into.')
|
||||
->param('vars', '', new Assoc(), 'Environment Variables required for the build')
|
||||
// refactor to `name`
|
||||
->param('runtime', '', new Text(128), 'Runtime for the cloud function')
|
||||
->param('baseImage', '', new Text(128), 'Base image name of the runtime')
|
||||
->inject('response')
|
||||
|
@ -455,6 +341,7 @@ App::post('/v1/runtimes')
|
|||
$buildStderr = $th->getMessage();
|
||||
$build = [
|
||||
'status' => 'failed',
|
||||
// Increase logs limit
|
||||
'stdout' => \utf8_encode(\mb_substr($buildStdout, -4096)),
|
||||
'stderr' => \utf8_encode(\mb_substr($buildStderr, -4096)),
|
||||
'startTime' => $buildStart,
|
||||
|
@ -471,7 +358,7 @@ App::post('/v1/runtimes')
|
|||
|
||||
if ( $build['status'] !== 'ready') {
|
||||
return $response
|
||||
->setStatusCode(201)
|
||||
->setStatusCode(500)
|
||||
->json($build);
|
||||
}
|
||||
|
||||
|
@ -566,6 +453,7 @@ App::get('/v1/runtimes')
|
|||
// TODO : Get list of active runtimes from swoole table
|
||||
$runtimes = [];
|
||||
|
||||
// Response model for runtime list
|
||||
$response
|
||||
->setStatusCode(200)
|
||||
->json($runtimes);
|
||||
|
@ -639,8 +527,133 @@ App::post('/v1/execution')
|
|||
->action(
|
||||
function (string $runtimeId, string $path, array $vars, string $data, string $runtime, string $entrypoint, $timeout, string $baseImage, Response $response) {
|
||||
|
||||
// Send both data and vars from the caller
|
||||
$execution = execute($runtimeId, $path, $vars, $data, $baseImage, $runtime, $entrypoint, $timeout);
|
||||
global $activeFunctions;
|
||||
|
||||
$container = 'runtime-' . $runtimeId;
|
||||
|
||||
// TODO: Also check for container status
|
||||
if (!$activeFunctions->exists($container)) {
|
||||
throw new Exception('Runtime not found. Please create the runtime.', 404);
|
||||
}
|
||||
|
||||
$secret = $activeFunctions->get($container, 'key');
|
||||
if (empty($secret)) {
|
||||
throw new Exception('Runtime secret not found. Please create the runtime.', 500);
|
||||
}
|
||||
|
||||
Console::info('Executing Runtime: ' . $runtimeId);
|
||||
|
||||
$executionStart = \microtime(true);
|
||||
$stdout = '';
|
||||
$stderr = '';
|
||||
$statusCode = 0;
|
||||
$errNo = -1;
|
||||
$executorResponse = '';
|
||||
|
||||
try {
|
||||
$attempts = 0;
|
||||
$max = 5;
|
||||
// cURL request to runtime
|
||||
do {
|
||||
$attempts++;
|
||||
$ch = \curl_init();
|
||||
|
||||
$body = \json_encode([
|
||||
'path' => '/usr/code',
|
||||
'file' => $entrypoint,
|
||||
'env' => $vars,
|
||||
'payload' => $data,
|
||||
'timeout' => $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900)
|
||||
]);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_URL, "http://" . $container . ":3000/");
|
||||
\curl_setopt($ch, CURLOPT_POST, true);
|
||||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
||||
\curl_setopt($ch, CURLOPT_TIMEOUT, $timeout ?? (int) App::getEnv('_APP_FUNCTIONS_TIMEOUT', 900));
|
||||
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
|
||||
|
||||
\curl_setopt($ch, CURLOPT_HTTPHEADER, [
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: ' . \strlen($body),
|
||||
'x-internal-challenge: ' . $secret,
|
||||
'host: null'
|
||||
]);
|
||||
|
||||
$executorResponse = \curl_exec($ch);
|
||||
|
||||
$statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
||||
|
||||
$error = \curl_error($ch);
|
||||
|
||||
$errNo = \curl_errno($ch);
|
||||
|
||||
\curl_close($ch);
|
||||
if ($errNo != CURLE_COULDNT_CONNECT && $errNo != 111) {
|
||||
break;
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
} while ($attempts < $max);
|
||||
|
||||
if ($attempts >= 5) {
|
||||
$stderr = 'Failed to connect to executor runtime after 5 attempts.';
|
||||
$statusCode = 124;
|
||||
}
|
||||
|
||||
// If timeout error
|
||||
if (in_array($errNo, [CURLE_OPERATION_TIMEDOUT, 110])) {
|
||||
$statusCode = 124;
|
||||
}
|
||||
|
||||
// 110 is the Swoole error code for timeout, see: https://www.swoole.co.uk/docs/swoole-error-code
|
||||
if ($errNo !== 0 && $errNo !== CURLE_COULDNT_CONNECT && $errNo !== CURLE_OPERATION_TIMEDOUT && $errNo !== 110) {
|
||||
throw new Exception('An internal curl error has occurred within the executor! Error Msg: ' . $error, 500);
|
||||
}
|
||||
|
||||
$executionData = [];
|
||||
|
||||
if (!empty($executorResponse)) {
|
||||
$executionData = json_decode($executorResponse, true);
|
||||
}
|
||||
|
||||
if (isset($executionData['code'])) {
|
||||
$statusCode = $executionData['code'];
|
||||
}
|
||||
|
||||
if ($statusCode === 500) {
|
||||
if (isset($executionData['message'])) {
|
||||
$stderr = $executionData['message'];
|
||||
} else {
|
||||
$stderr = 'Internal Runtime error';
|
||||
}
|
||||
} else if ($statusCode === 124) {
|
||||
$stderr = 'Execution timed out.';
|
||||
} else if ($statusCode === 0) {
|
||||
$stderr = 'Execution failed.';
|
||||
} else if ($statusCode >= 200 && $statusCode < 300) {
|
||||
$stdout = $executorResponse;
|
||||
} else {
|
||||
$stderr = 'Execution failed.';
|
||||
}
|
||||
|
||||
$executionEnd = \microtime(true);
|
||||
$executionTime = ($executionEnd - $executionStart);
|
||||
$functionStatus = ($statusCode >= 200 && $statusCode < 300) ? 'completed' : 'failed';
|
||||
|
||||
Console::success('Function executed in ' . $executionTime . ' seconds, status: ' . $functionStatus);
|
||||
|
||||
$execution = [
|
||||
'status' => $functionStatus,
|
||||
'statusCode' => $statusCode,
|
||||
'stdout' => \utf8_encode(\mb_substr($stdout, -8000)),
|
||||
'stderr' => \utf8_encode(\mb_substr($stderr, -8000)),
|
||||
'time' => $executionTime,
|
||||
];
|
||||
} catch (\Throwable $th) {
|
||||
|
||||
}
|
||||
|
||||
$response
|
||||
->setStatusCode(Response::STATUS_CODE_OK)
|
||||
|
@ -710,10 +723,6 @@ App::error(function ($error, $utopia, $request, $response) {
|
|||
/** @var Utopia\Swoole\Request $request */
|
||||
/** @var Appwrite\Utopia\Response $response */
|
||||
|
||||
if ($error instanceof PDOException) {
|
||||
throw $error;
|
||||
}
|
||||
|
||||
$route = $utopia->match($request);
|
||||
// logError($error, "httpError", $route);
|
||||
|
||||
|
@ -744,6 +753,17 @@ App::error(function ($error, $utopia, $request, $response) {
|
|||
$response->json($output);
|
||||
}, ['error', 'utopia', 'request', 'response']);
|
||||
|
||||
App::init(function ($request, $response) {
|
||||
$secretKey = $request->getHeader('x-appwrite-executor-key', '');
|
||||
if (empty($secretKey)) {
|
||||
throw new Exception('Missing executor key', 401);
|
||||
}
|
||||
|
||||
if ($secretKey !== App::getEnv('_APP_EXECUTOR_SECRET', '')) {
|
||||
throw new Exception('Missing executor key', 401);
|
||||
}
|
||||
}, ['request', 'response']);
|
||||
|
||||
$http->on('start', function ($http) {
|
||||
@Process::signal(SIGINT, function () use ($http) {
|
||||
// handleShutdown();
|
||||
|
@ -771,24 +791,19 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
|
|||
$response = new Response($swooleResponse);
|
||||
$app = new App('UTC');
|
||||
|
||||
// Check environment variable key
|
||||
$secretKey = $request->getHeader('x-appwrite-executor-key', '');
|
||||
|
||||
if (empty($secretKey)) {
|
||||
$swooleResponse->status(401);
|
||||
return $swooleResponse->end('401: Authentication Error');
|
||||
}
|
||||
|
||||
if ($secretKey !== App::getEnv('_APP_EXECUTOR_SECRET', '')) {
|
||||
$swooleResponse->status(401);
|
||||
return $swooleResponse->end('401: Authentication Error');
|
||||
}
|
||||
|
||||
try {
|
||||
$app->run($request, $response);
|
||||
} catch (Exception $e) {
|
||||
} catch (\Throwable $th) {
|
||||
// logError($e, "serverError");
|
||||
$swooleResponse->end('500: Server Error');
|
||||
$swooleResponse->setStatusCode(500);
|
||||
$output = [
|
||||
'message' => 'Error: '. $th->getMessage(),
|
||||
'code' => 500,
|
||||
'file' => $th->getFile(),
|
||||
'line' => $th->getLine(),
|
||||
'trace' => $th->getTrace()
|
||||
];
|
||||
$swooleResponse->end(\json_encode($output));
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -394,6 +394,8 @@ services:
|
|||
networks:
|
||||
appwrite:
|
||||
runtimes:
|
||||
ports:
|
||||
- 9509:8080
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
- appwrite-functions:/storage/functions:rw
|
||||
|
|
|
@ -59,7 +59,7 @@ class Executor
|
|||
|
||||
$status = $response['headers']['status-code'];
|
||||
if ($status >= 400) {
|
||||
throw new \Exception('Error creating build: ', $status);
|
||||
throw new \Exception($response['body'], $status);
|
||||
}
|
||||
|
||||
return $response['body'];
|
||||
|
|
Loading…
Reference in a new issue