1
0
Fork 0
mirror of synced 2024-07-01 12:40:34 +12:00

feat: refactor some of the executor

This commit is contained in:
Torsten Dittmann 2022-01-21 11:42:12 +01:00
parent 2e3c904385
commit 9ab6b695e6
3 changed files with 254 additions and 297 deletions

View file

@ -31,20 +31,20 @@ use Swoole\Coroutine as Co;
use Utopia\Cache\Cache; use Utopia\Cache\Cache;
use Utopia\Database\Query; use Utopia\Database\Query;
use Utopia\Orchestration\Adapter\DockerCLI; use Utopia\Orchestration\Adapter\DockerCLI;
use Utopia\Validator\Boolean;
use Utopia\Logger\Log; use Utopia\Logger\Log;
require_once __DIR__ . '/init.php'; require_once __DIR__ . '/init.php';
Authorization::disable();
Swoole\Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL); Swoole\Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
global $register; function logError(Throwable $error, string $action, Utopia\Route $route = null)
$logError = function(Throwable $error, string $action, Utopia\Route $route = null) use ($register) { {
global $register;
$logger = $register->get('logger'); $logger = $register->get('logger');
var_dump($error->getTraceAsString()); if ($logger) {
if($logger) {
$version = App::getEnv('_APP_VERSION', 'UNKNOWN'); $version = App::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log(); $log = new Log();
@ -54,7 +54,7 @@ $logError = function(Throwable $error, string $action, Utopia\Route $route = nul
$log->setType(Log::TYPE_ERROR); $log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage()); $log->setMessage($error->getMessage());
if($route) { if ($route) {
$log->addTag('method', $route->getMethod()); $log->addTag('method', $route->getMethod());
$log->addTag('url', $route->getPath()); $log->addTag('url', $route->getPath());
} }
@ -72,7 +72,7 @@ $logError = function(Throwable $error, string $action, Utopia\Route $route = nul
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING); $log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$responseCode = $logger->addLog($log); $responseCode = $logger->addLog($log);
Console::info('Executor log pushed with status code: '.$responseCode); Console::info('Executor log pushed with status code: ' . $responseCode);
} }
Console::error('[Error] Type: ' . get_class($error)); Console::error('[Error] Type: ' . get_class($error));
@ -119,12 +119,12 @@ try {
$residueList = $orchestration->list(['label' => 'appwrite-type=function']); $residueList = $orchestration->list(['label' => 'appwrite-type=function']);
foreach ($residueList as $value) { foreach ($residueList as $value) {
$activeFunctions->set($value->getName(), [ go(fn () => $activeFunctions->set($value->getName(), [
'id' => $value->getId(), 'id' => $value->getId(),
'name' => $value->getName(), 'name' => $value->getName(),
'status' => $value->getStatus(), 'status' => $value->getStatus(),
'private-key' => '' 'private-key' => ''
]); ]));
} }
$executionEnd = \microtime(true); $executionEnd = \microtime(true);
@ -135,26 +135,21 @@ try {
call_user_func($logError, $error, "startupError"); call_user_func($logError, $error, "startupError");
} }
$createRuntimeServer = function(string $functionId, string $projectId, string $tagId, Database $database) use($logError): void function createRuntimeServer(string $functionId, string $projectId, string $tagId, Database $database): void
{ {
global $orchestration; global $orchestration;
global $runtimes; global $runtimes;
global $activeFunctions; global $activeFunctions;
// Grab Function Document $function = $database->getDocument('functions', $functionId);
/** @var Document $function */ $tag = $database->getDocument('tags', $tagId);
$function = Authorization::skip(fn () => $database->getDocument('functions', $functionId));
/** @var Document $tag */
$tag = Authorization::skip(fn () => $database->getDocument('tags', $tagId));
if ($tag->getAttribute('buildId') === null) { if ($tag->getAttribute('buildId') === null) {
throw new Exception('Tag has no buildId'); throw new Exception('Tag has no buildId');
} }
// Grab Build Document // Grab Build Document
$build = Authorization::skip(function () use ($database, $tag) { $build = $database->getDocument('builds', $tag->getAttribute('buildId'));
return $database->getDocument('builds', $tag->getAttribute('buildId'));
});
// Check if function isn't already created // Check if function isn't already created
$functions = $orchestration->list(['label' => 'appwrite-type=function', 'name' => 'appwrite-function-' . $tag->getId()]); $functions = $orchestration->list(['label' => 'appwrite-type=function', 'name' => 'appwrite-function-' . $tag->getId()]);
@ -167,9 +162,7 @@ $createRuntimeServer = function(string $functionId, string $projectId, string $t
$secret = \bin2hex(\random_bytes(16)); $secret = \bin2hex(\random_bytes(16));
// Check if runtime is active // Check if runtime is active
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) $runtime = $runtimes[$function->getAttribute('runtime', '')] ?? null;
? $runtimes[$function->getAttribute('runtime', '')]
: null;
if ($tag->getAttribute('functionId') !== $function->getId()) { if ($tag->getAttribute('functionId') !== $function->getId()) {
throw new Exception('Tag not found', 404); throw new Exception('Tag not found', 404);
@ -202,7 +195,7 @@ $createRuntimeServer = function(string $functionId, string $projectId, string $t
try { try {
throw new Exception('Failed to remove container: ' . $e->getMessage()); throw new Exception('Failed to remove container: ' . $e->getMessage());
} catch (Throwable $error) { } catch (Throwable $error) {
call_user_func($logError, $error, "createRuntimeServer"); logError($error, "createRuntimeServer");
} }
} }
@ -210,7 +203,7 @@ $createRuntimeServer = function(string $functionId, string $projectId, string $t
} }
// Check if tag hasn't failed // Check if tag hasn't failed
if ($build->getAttribute('status') == 'failed') { if ($build->getAttribute('status') === 'failed') {
throw new Exception('Tag build failed, please check your logs.', 500); throw new Exception('Tag build failed, please check your logs.', 500);
} }
@ -258,9 +251,10 @@ $createRuntimeServer = function(string $functionId, string $projectId, string $t
$executionStart = \microtime(true); $executionStart = \microtime(true);
$executionTime = \time(); $executionTime = \time();
$orchestration->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', '1')); $orchestration
$orchestration->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256')); ->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', '1'))
$orchestration->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256')); ->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256'))
->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256'));
foreach ($vars as $key => $value) { foreach ($vars as $key => $value) {
$vars[$key] = strval($value); $vars[$key] = strval($value);
@ -304,7 +298,7 @@ $createRuntimeServer = function(string $functionId, string $projectId, string $t
} }
}; };
$execute = function(string $trigger, string $projectId, string $executionId, string $functionId, Database $database, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = '') use($logError, $createRuntimeServer): array function execute(string $trigger, string $projectId, string $executionId, string $functionId, Database $database, string $event = '', string $eventData = '', string $data = '', array $webhooks = [], string $userId = '', string $jwt = ''): array
{ {
Console::info('Executing function: ' . $functionId); Console::info('Executing function: ' . $functionId);
@ -312,19 +306,9 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
global $runtimes; global $runtimes;
global $register; global $register;
// Grab Tag Document $function = $database->getDocument('functions', $functionId);
$function = Authorization::skip(function () use ($database, $functionId) { $tag = $database->getDocument('tags', $function->getAttribute('tag', ''));
return $database->getDocument('functions', $functionId); $build = $database->getDocument('builds', $tag->getAttribute('buildId', ''));
});
$tag = Authorization::skip(function () use ($database, $function) {
return $database->getDocument('tags', $function->getAttribute('tag', ''));
});
// Grab Build Document
$build = Authorization::skip(function () use ($database, $tag) {
return $database->getDocument('builds', $tag->getAttribute('buildId', ''));
});
if ($tag->getAttribute('functionId') !== $function->getId()) { if ($tag->getAttribute('functionId') !== $function->getId()) {
throw new Exception('Tag not found', 404); throw new Exception('Tag not found', 404);
@ -332,8 +316,9 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
// Grab execution document if exists // Grab execution document if exists
// It it doesn't exist, create a new one. // It it doesn't exist, create a new one.
$execution = Authorization::skip(function () use ($database, $executionId, $userId, $function, $tag, $trigger, $functionId) { $execution = !empty($executionId)
return (!empty($executionId)) ? $database->getDocument('executions', $executionId) : $database->createDocument('executions', new Document([ ? $database->getDocument('executions', $executionId)
: $database->createDocument('executions', new Document([
'$id' => $executionId, '$id' => $executionId,
'$read' => (!$userId == '') ? ['user:' . $userId] : [], '$read' => (!$userId == '') ? ['user:' . $userId] : [],
'$write' => ['role:all'], '$write' => ['role:all'],
@ -348,7 +333,6 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
'time' => 0.0, 'time' => 0.0,
'search' => implode(' ', [$functionId, $executionId]), 'search' => implode(' ', [$functionId, $executionId]),
])); ]));
});
if (false === $execution || ($execution instanceof Document && $execution->isEmpty())) { if (false === $execution || ($execution instanceof Document && $execution->isEmpty())) {
throw new Exception('Failed to create or read execution'); throw new Exception('Failed to create or read execution');
@ -357,22 +341,19 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
if ($build->getAttribute('status') == 'building') { if ($build->getAttribute('status') == 'building') {
$execution->setAttribute('status', 'failed') $execution
->setAttribute('status', 'failed')
->setAttribute('statusCode', 500) ->setAttribute('statusCode', 500)
->setAttribute('stderr', 'Tag is still being built.') ->setAttribute('stderr', 'Tag is still being built.')
->setAttribute('time', 0); ->setAttribute('time', 0);
Authorization::skip(function () use ($database, $execution) { $database->updateDocument('executions', $execution->getId(), $execution);
return $database->updateDocument('executions', $execution->getId(), $execution);
});
throw new Exception('Execution Failed. Reason: Tag is still being built.'); throw new Exception('Execution Failed. Reason: Tag is still being built.');
} }
// Check if runtime is active // Check if runtime is active
$runtime = (isset($runtimes[$function->getAttribute('runtime', '')])) $runtime = $runtimes[$function->getAttribute('runtime', '')] ?? null;
? $runtimes[$function->getAttribute('runtime', '')]
: null;
if (\is_null($runtime)) { if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
@ -402,37 +383,34 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
if ($build->getAttribute('status') !== 'ready') { if ($build->getAttribute('status') !== 'ready') {
// Create a new build entry // Create a new build entry
$buildId = $database->getId(); $buildId = $database->getId();
Authorization::skip(function () use ($buildId, $database, $tag, $userId, $runtime, $function, $projectId) { $database->createDocument('builds', new Document([
$database->createDocument('builds', new Document([ '$id' => $buildId,
'$id' => $buildId, '$read' => (!$userId == '') ? ['user:' . $userId] : [],
'$read' => (!$userId == '') ? ['user:' . $userId] : [], '$write' => ['role:all'],
'$write' => ['role:all'], 'dateCreated' => time(),
'dateCreated' => time(), 'status' => 'processing',
'status' => 'processing', 'outputPath' => '',
'outputPath' => '', 'runtime' => $function->getAttribute('runtime', ''),
'runtime' => $function->getAttribute('runtime', ''), 'source' => $tag->getAttribute('path'),
'source' => $tag->getAttribute('path'), 'sourceType' => Storage::DEVICE_LOCAL,
'sourceType' => Storage::DEVICE_LOCAL, 'stdout' => '',
'stdout' => '', 'stderr' => '',
'stderr' => '', 'buildTime' => 0,
'buildTime' => 0, 'envVars' => [
'envVars' => [ 'ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint'),
'ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint'), 'APPWRITE_FUNCTION_ID' => $function->getId(),
'APPWRITE_FUNCTION_ID' => $function->getId(), 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''), 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'], 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'], 'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId, ]
] ]));
]));
$tag->setAttribute('buildId', $buildId); $tag->setAttribute('buildId', $buildId);
$database->updateDocument('tags', $tag->getId(), $tag); $database->updateDocument('tags', $tag->getId(), $tag);
});
runBuildStage($buildId, $projectId, $database); runBuildStage($buildId, $projectId, $database);
sleep(1);
} }
} catch (Exception $e) { } catch (Exception $e) {
$execution->setAttribute('status', 'failed') $execution->setAttribute('status', 'failed')
@ -440,16 +418,14 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4000))) // log last 4000 chars output ->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4000))) // log last 4000 chars output
->setAttribute('time', 0); ->setAttribute('time', 0);
Authorization::skip(function () use ($database, $execution) { $database->updateDocument('executions', $execution->getId(), $execution);
return $database->updateDocument('executions', $execution->getId(), $execution);
});
throw new Error('Something went wrong building the code. ' . $e->getMessage()); throw new Error('Something went wrong building the code. ' . $e->getMessage());
} }
try { try {
if (!$activeFunctions->exists($container)) { // Create contianer if not ready if (!$activeFunctions->exists($container)) { // Create contianer if not ready
$createRuntimeServer($functionId, $projectId, $tag->getId(), $database); createRuntimeServer($functionId, $projectId, $tag->getId(), $database);
} else if ($activeFunctions->get($container)['status'] === 'Down') { } else if ($activeFunctions->get($container)['status'] === 'Down') {
sleep(1); sleep(1);
} else { } else {
@ -461,14 +437,12 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4000))) // log last 4000 chars output ->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4000))) // log last 4000 chars output
->setAttribute('time', 0); ->setAttribute('time', 0);
$execution = Authorization::skip(function () use ($database, $execution) { $execution = $database->updateDocument('executions', $execution->getId(), $execution);
return $database->updateDocument('executions', $execution->getId(), $execution);
});
try { try {
throw new Exception('Something went wrong building the runtime server. ' . $e->getMessage()); throw new Exception('Something went wrong building the runtime server. ' . $e->getMessage());
} catch (\Exception $error) { } catch (\Exception $error) {
call_user_func($logError, $error, "execution"); logError($error, 'execution');
} }
return [ return [
@ -561,7 +535,7 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
} }
// If timeout error // If timeout error
if ($errNo == CURLE_OPERATION_TIMEDOUT || $errNo == 110) { if (in_array($errNo, [CURLE_OPERATION_TIMEDOUT, 110])) {
$statusCode = 124; $statusCode = 124;
} }
@ -609,9 +583,7 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
->setAttribute('stderr', \utf8_encode(\mb_substr($stderr, -8000))) ->setAttribute('stderr', \utf8_encode(\mb_substr($stderr, -8000)))
->setAttribute('time', $executionTime); ->setAttribute('time', $executionTime);
$execution = Authorization::skip(function () use ($database, $execution) { $execution = $database->updateDocument('executions', $execution->getId(), $execution);
return $database->updateDocument('executions', $execution->getId(), $execution);
});
$executionModel = new Execution(); $executionModel = new Execution();
$executionUpdate = new Event('v1-webhooks', 'WebhooksV1'); $executionUpdate = new Event('v1-webhooks', 'WebhooksV1');
@ -662,7 +634,6 @@ $execute = function(string $trigger, string $projectId, string $executionId, str
App::post('/v1/execute') // Define Route App::post('/v1/execute') // Define Route
->desc('Execute a function') ->desc('Execute a function')
->inject('request')
->param('trigger', '', new Text(1024)) ->param('trigger', '', new Text(1024))
->param('projectId', '', new Text(1024)) ->param('projectId', '', new Text(1024))
->param('executionId', '', new Text(1024), '', true) ->param('executionId', '', new Text(1024), '', true)
@ -676,12 +647,12 @@ App::post('/v1/execute') // Define Route
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->action( ->action(
function ($trigger, $projectId, $executionId, $functionId, $event, $eventData, $data, $webhooks, $userId, $jwt, $request, $response, $dbForProject) use ($execute, $logError) { function (string $trigger, string $projectId, string $executionId, string $functionId, string $event, string $eventData, string $data, array $webhooks, string $userId, string $jwt, Response $response, Database $dbForProject) {
try { try {
$data = $execute($trigger, $projectId, $executionId, $functionId, $dbForProject, $event, $eventData, $data, $webhooks, $userId, $jwt); $data = execute($trigger, $projectId, $executionId, $functionId, $dbForProject, $event, $eventData, $data, $webhooks, $userId, $jwt);
$response->json($data); $response->json($data);
} catch (Exception $e) { } catch (Exception $e) {
call_user_func($logError, $e, "executeEndpoint"); logError($e, 'executeEndpoint');
$response $response
->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate') ->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate')
@ -698,84 +669,62 @@ App::post('/v1/cleanup/function')
->param('functionId', '', new UID()) ->param('functionId', '', new UID())
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->inject('projectID') ->action(
->action(function ($functionId, $response, $dbForProject, $projectID) use($logError) { function (string $functionId, Response $response, Database $dbForProject) use ($orchestration) {
/** @var string $functionId */ try {
/** @var Appwrite\Utopia\Response $response */ // Get function document
/** @var Utopia\Database\Database $dbForProject */ $function = $dbForProject->getDocument('functions', $functionId);
/** @var string $projectID */
global $orchestration; // Check if function exists
if ($function->isEmpty()) {
try { throw new Exception('Function not found', 404);
// Get function document
$function = Authorization::skip(function () use ($dbForProject, $functionId) {
return $dbForProject->getDocument('functions', $functionId);
});
// Check if function exists
if ($function->isEmpty()) {
throw new Exception('Function not found', 404);
}
$results = Authorization::skip(function () use ($dbForProject, $functionId) {
return $dbForProject->find('tags', [new Query('functionId', Query::TYPE_EQUAL, [$functionId])], 999);
});
// If amount is 0 then we simply return true
if (count($results) === 0) {
return $response->json(['success' => true]);
}
// Delete the containers of all tags
foreach ($results as $tag) {
try {
// Remove any ongoing builds
if ($tag->getAttribute('buildId')) {
$build = Authorization::skip(function () use ($dbForProject, $tag) {
return $dbForProject->getDocument('builds', $tag->getAttribute('buildId'));
});
if ($build->getAttribute('status') == 'building') {
// Remove the build
$orchestration->remove('build-stage-' . $tag->getAttribute('buildId'), true);
Console::info('Removed build for tag ' . $tag['$id']);
}
}
$orchestration->remove('appwrite-function-' . $tag['$id'], true);
Console::info('Removed container for tag ' . $tag['$id']);
} catch (Exception $e) {
// Do nothing, we don't care that much if it fails
} }
$results = $dbForProject->find('tags', [new Query('functionId', Query::TYPE_EQUAL, [$functionId])], 999);
// If amount is 0 then we simply return true
if (count($results) === 0) {
return $response->json(['success' => true]);
}
// Delete the containers of all tags
foreach ($results as $tag) {
try {
// Remove any ongoing builds
if ($tag->getAttribute('buildId')) {
$build = $dbForProject->getDocument('builds', $tag->getAttribute('buildId'));
if ($build->getAttribute('status') == 'building') {
// Remove the build
$orchestration->remove('build-stage-' . $tag->getAttribute('buildId'), true);
Console::info('Removed build for tag ' . $tag['$id']);
}
}
$orchestration->remove('appwrite-function-' . $tag['$id'], true);
Console::info('Removed container for tag ' . $tag['$id']);
} catch (Exception $e) {
// Do nothing, we don't care that much if it fails
}
}
return $response->json(['success' => true]);
} catch (Exception $e) {
logError($e, "cleanupFunction");
return $response->json(['error' => $e->getMessage()]);
} }
return $response->json(['success' => true]);
} catch (Exception $e) {
call_user_func($logError, $e, "cleanupFunction");
return $response->json(['error' => $e->getMessage()]);
} }
}); );
App::post('/v1/cleanup/tag') App::post('/v1/cleanup/tag')
->param('tagId', '', new UID(), 'Tag unique ID.') ->param('tagId', '', new UID(), 'Tag unique ID.')
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->inject('projectID') ->action(function (string $tagId, Response $response, Database $dbForProject) use ($orchestration) {
->action(function ($tagId, $response, $dbForProject, $projectID) use($logError) {
/** @var string $tagId */
/** @var Appwrite\Utopia\Response $response */
/** @var Appwrite\Database\Database $dbForProject */
/** @var string $projectID */
global $orchestration;
try { try {
// Get tag document // Get tag document
$tag = Authorization::skip(function () use ($dbForProject, $tagId) { $tag = $dbForProject->getDocument('tags', $tagId);
return $dbForProject->getDocument('tags', $tagId);
});
// Check if tag exists // Check if tag exists
if ($tag->isEmpty()) { if ($tag->isEmpty()) {
@ -785,9 +734,7 @@ App::post('/v1/cleanup/tag')
try { try {
// Remove any ongoing builds // Remove any ongoing builds
if ($tag->getAttribute('buildId')) { if ($tag->getAttribute('buildId')) {
$build = Authorization::skip(function () use ($dbForProject, $tag) { $build = $dbForProject->getDocument('builds', $tag->getAttribute('buildId'));
return $dbForProject->getDocument('builds', $tag->getAttribute('buildId'));
});
if ($build->getAttribute('status') == 'building') { if ($build->getAttribute('status') == 'building') {
// Remove the build // Remove the build
@ -803,7 +750,7 @@ App::post('/v1/cleanup/tag')
// Do nothing, we don't care that much if it fails // Do nothing, we don't care that much if it fails
} }
} catch (Exception $e) { } catch (Exception $e) {
call_user_func($logError, $e, "cleanupFunction"); logError($e, "cleanupFunction");
return $response->json(['error' => $e->getMessage()]); return $response->json(['error' => $e->getMessage()]);
} }
@ -814,18 +761,14 @@ App::post('/v1/tag')
->param('functionId', '', new UID(), 'Function unique ID.') ->param('functionId', '', new UID(), 'Function unique ID.')
->param('tagId', '', new UID(), 'Tag unique ID.') ->param('tagId', '', new UID(), 'Tag unique ID.')
->param('userId', '', new UID(), 'User unique ID.', true) ->param('userId', '', new UID(), 'User unique ID.', true)
->param('autoDeploy', false, new Boolean(), '', true)
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->inject('projectID') ->inject('projectID')
->action(function ($functionId, $tagId, $userId, $autoDeploy, $response, $dbForProject, $projectID) use ($createRuntimeServer) { ->action(function (string $functionId, string $tagId, string $userId, Response $response, Database $dbForProject, string $projectID) use ($runtimes) {
/** @var Utopia\Database\Database $dbForProject */
global $runtimes;
// Get function document // Get function document
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); $function = $dbForProject->getDocument('functions', $functionId);
// Get tag document // Get tag document
$tag = Authorization::skip(fn () => $dbForProject->getDocument('tags', $tagId)); $tag = $dbForProject->getDocument('tags', $tagId);
// Check if both documents exist // Check if both documents exist
if ($function->isEmpty()) { if ($function->isEmpty()) {
@ -833,13 +776,14 @@ App::post('/v1/tag')
} }
if ($tag->isEmpty()) { if ($tag->isEmpty()) {
var_dump($tag->getArrayCopy());
throw new Exception('Tag not found', 404); throw new Exception('Tag not found', 404);
} }
$runtime = (isset($runtimes[$function->getAttribute('runtime')])) $runtime = $runtimes[$function->getAttribute('runtime')] ?? null;
? $runtimes[$function->getAttribute('runtime')]
: null; if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
// Create a new build entry // Create a new build entry
$buildId = $dbForProject->getId(); $buildId = $dbForProject->getId();
@ -847,43 +791,41 @@ App::post('/v1/tag')
if ($tag->getAttribute('buildId')) { if ($tag->getAttribute('buildId')) {
$buildId = $tag->getAttribute('buildId'); $buildId = $tag->getAttribute('buildId');
} else { } else {
Authorization::skip(function () use ($buildId, $dbForProject, $tag, $userId, $function, $projectID, $runtime) { try {
try { $dbForProject->createDocument('builds', new Document([
$dbForProject->createDocument('builds', new Document([ '$id' => $buildId,
'$id' => $buildId, '$read' => (!empty($userId)) ? ['user:' . $userId] : [],
'$read' => (!empty($userId)) ? ['user:' . $userId] : [], '$write' => ['role:all'],
'$write' => ['role:all'], 'dateCreated' => time(),
'dateCreated' => time(), 'status' => 'processing',
'status' => 'processing', 'runtime' => $function->getAttribute('runtime'),
'runtime' => $function->getAttribute('runtime'), 'outputPath' => '',
'outputPath' => '', 'source' => $tag->getAttribute('path'),
'source' => $tag->getAttribute('path'), 'sourceType' => Storage::DEVICE_LOCAL,
'sourceType' => Storage::DEVICE_LOCAL, 'stdout' => '',
'stdout' => '', 'stderr' => '',
'stderr' => '', 'buildTime' => 0,
'buildTime' => 0, 'envVars' => [
'envVars' => [ 'ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint'),
'ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint'), 'APPWRITE_FUNCTION_ID' => $function->getId(),
'APPWRITE_FUNCTION_ID' => $function->getId(), 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''), 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'], 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'], 'APPWRITE_FUNCTION_PROJECT_ID' => $projectID,
'APPWRITE_FUNCTION_PROJECT_ID' => $projectID, ]
] ]));
]));
$tag->setAttribute('buildId', $buildId); $tag->setAttribute('buildId', $buildId);
$dbForProject->updateDocument('tags', $tag->getId(), $tag); $dbForProject->updateDocument('tags', $tag->getId(), $tag);
} catch (\Throwable $th) { } catch (\Throwable $th) {
var_dump($tag->getArrayCopy()); var_dump($tag->getArrayCopy());
throw $th; throw $th;
} }
});
} }
// Build Code // Build Code
go(function () use ($dbForProject, $projectID, $tagId, $buildId, $functionId, $function, $createRuntimeServer) { go(function () use ($dbForProject, $projectID, $tagId, $buildId, $functionId, $function) {
// Build Code // Build Code
runBuildStage($buildId, $projectID, $dbForProject); runBuildStage($buildId, $projectID, $dbForProject);
@ -893,14 +835,10 @@ App::post('/v1/tag')
$next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : 0; $next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : 0;
// Grab tag // Grab tag
$tag = Authorization::skip(function () use ($dbForProject, $tagId, $next, $buildId) { $tag = $dbForProject->getDocument('tags', $tagId);
return $dbForProject->getDocument('tags', $tagId);
});
// Grab build // Grab build
$build = Authorization::skip(function () use ($dbForProject, $buildId) { $build = $dbForProject->getDocument('builds', $buildId);
return $dbForProject->getDocument('builds', $buildId);
});
// If the build failed, it won't be possible to deploy // If the build failed, it won't be possible to deploy
if ($build->getAttribute('status') !== 'ready') { if ($build->getAttribute('status') !== 'ready') {
@ -909,16 +847,12 @@ App::post('/v1/tag')
if ($tag->getAttribute('automaticDeploy') === true) { if ($tag->getAttribute('automaticDeploy') === true) {
// Update the function document setting the tag as the active one // Update the function document setting the tag as the active one
$function = Authorization::skip(function () use ($function, $dbForProject, $tag, $next) { $function->setAttribute('tag', $tag->getId())->setAttribute('scheduleNext', (int)$next);
return $function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ $function = $dbForProject->updateDocument('functions', $function->getId(), $function);
'tag' => $tag->getId(),
'scheduleNext' => (int)$next,
])));
});
} }
// Deploy Runtime Server // Deploy Runtime Server
$createRuntimeServer($functionId, $projectID, $tagId, $dbForProject); createRuntimeServer($functionId, $projectID, $tagId, $dbForProject);
}); });
if (false === $function) { if (false === $function) {
@ -929,9 +863,8 @@ App::post('/v1/tag')
}); });
App::get('/v1/') App::get('/v1/')
->inject('request')
->inject('response') ->inject('response')
->action(function ($request, $response) { ->action(function (Response $response) {
$response $response
->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate') ->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate')
->addHeader('Expires', '0') ->addHeader('Expires', '0')
@ -946,17 +879,10 @@ App::post('/v1/build/:buildId') // Start a Build
->inject('response') ->inject('response')
->inject('dbForProject') ->inject('dbForProject')
->inject('projectID') ->inject('projectID')
->action(function ($buildId, $response, $dbForProject, $projectID) use ($logError) { ->action(function (string $buildId, Response $response, Database $dbForProject, string $projectID) {
/** @var string $buildId */
/** @var Appwrite\Utopia\Response $response */
/** @var Utopia\Database\Database $dbForProject */
/** @var string $projectID */
try { try {
// Get build document // Get build document
$build = Authorization::skip(function () use ($buildId, $dbForProject) { $build = $dbForProject->getDocument('builds', $buildId);
return $dbForProject->getDocument('builds', $buildId);
});
// Check if build exists // Check if build exists
if ($build->isEmpty()) { if ($build->isEmpty()) {
@ -981,7 +907,7 @@ App::post('/v1/build/:buildId') // Start a Build
// return success // return success
return $response->json(['success' => true]); return $response->json(['success' => true]);
} catch (Exception $e) { } catch (Exception $e) {
call_user_func($logError, $e, "buildEndpoint"); logError($e, "buildEndpoint");
$response $response
->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate') ->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate')
@ -1006,12 +932,10 @@ function runBuildStage(string $buildId, string $projectID, Database $database):
$database = new Database(new MariaDB($db), $cache); $database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace('_project_'.$projectID); $database->setNamespace('_project_' . $projectID);
// Check if build has already been run // Check if build has already been run
$build = Authorization::skip(function () use ($buildId, $database) { $build = $database->getDocument('builds', $buildId);
return $database->getDocument('builds', $buildId);
});
try { try {
// If we already have a built package ready there is no need to rebuild. // If we already have a built package ready there is no need to rebuild.
@ -1022,14 +946,10 @@ function runBuildStage(string $buildId, string $projectID, Database $database):
// Update Tag Status // Update Tag Status
$build->setAttribute('status', 'building'); $build->setAttribute('status', 'building');
Authorization::skip(function () use ($build, $database) { $database->updateDocument('builds', $build->getId(), $build);
$database->updateDocument('builds', $build->getId(), $build);
});
// Check if runtime is active // Check if runtime is active
$runtime = (isset($runtimes[$build->getAttribute('runtime', '')])) $runtime = $runtimes[$build->getAttribute('runtime', '')] ?? null;
? $runtimes[$build->getAttribute('runtime', '')]
: null;
if (\is_null($runtime)) { if (\is_null($runtime)) {
throw new Exception('Runtime "' . $build->getAttribute('runtime', '') . '" is not supported'); throw new Exception('Runtime "' . $build->getAttribute('runtime', '') . '" is not supported');
@ -1074,9 +994,10 @@ function runBuildStage(string $buildId, string $projectID, Database $database):
$buildStart = \microtime(true); $buildStart = \microtime(true);
$buildTime = \time(); $buildTime = \time();
$orchestration->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', 0)); $orchestration
$orchestration->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', 256)); ->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', 0))
$orchestration->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', 256)); ->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', 256))
->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', 256));
foreach ($vars as &$value) { foreach ($vars as &$value) {
$value = strval($value); $value = strval($value);
@ -1203,25 +1124,26 @@ function runBuildStage(string $buildId, string $projectID, Database $database):
$buildStdout = 'Build Successful!'; $buildStdout = 'Build Successful!';
} }
$build->setAttribute('outputPath', $path) $build
->setAttribute('outputPath', $path)
->setAttribute('status', 'ready') ->setAttribute('status', 'ready')
->setAttribute('stdout', \utf8_encode(\mb_substr($buildStdout, -4096))) ->setAttribute('stdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
->setAttribute('stderr', \utf8_encode(\mb_substr($buildStderr, -4096))) ->setAttribute('stderr', \utf8_encode(\mb_substr($buildStderr, -4096)))
->setAttribute('buildTime', $buildTime); ->setAttribute('buildTime', $buildTime);
// Update build with built code attribute // Update build with built code attribute
$build = Authorization::skip(fn () => $database->updateDocument('builds', $buildId, $build)); $build = $database->updateDocument('builds', $buildId, $build);
$buildEnd = \microtime(true); $buildEnd = \microtime(true);
Console::info('Build Stage Ran in ' . ($buildEnd - $buildStart) . ' seconds'); Console::info('Build Stage Ran in ' . ($buildEnd - $buildStart) . ' seconds');
} catch (Exception $e) { } catch (Exception $e) {
var_dump($e->getTraceAsString()); $build
$build->setAttribute('status', 'failed') ->setAttribute('status', 'failed')
->setAttribute('stdout', \utf8_encode(\mb_substr($buildStdout, -4096))) ->setAttribute('stdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4096))); ->setAttribute('stderr', \utf8_encode(\mb_substr($e->getMessage(), -4096)));
$build = Authorization::skip(fn () => $database->updateDocument('builds', $buildId, $build)); $build = $database->updateDocument('builds', $buildId, $build);
// also remove the container if it exists // also remove the container if it exists
if ($id) { if ($id) {
@ -1241,20 +1163,20 @@ App::setMode(App::MODE_TYPE_PRODUCTION); // Define Mode
$http = new Server("0.0.0.0", 8080); $http = new Server("0.0.0.0", 8080);
$handleShutdown = function() use($logError) function handleShutdown() {
{ global $orchestration;
global $register;
try { try {
Console::info('Cleaning up containers before shutdown...'); Console::info('Cleaning up containers before shutdown...');
// Remove all containers. // Remove all containers.
global $orchestration;
global $register;
$functionsToRemove = $orchestration->list(['label' => 'appwrite-type=function']); $functionsToRemove = $orchestration->list(['label' => 'appwrite-type=function']);
foreach ($functionsToRemove as $container) { foreach ($functionsToRemove as $container) {
$orchestration->remove($container->getId(), true); go(fn () => $orchestration->remove($container->getId(), true));
// Get a database instance // Get a database instance
$db = $register->get('dbPool')->get(); $db = $register->get('dbPool')->get();
@ -1263,57 +1185,54 @@ $handleShutdown = function() use($logError)
$cache = new Cache(new RedisCache($cache)); $cache = new Cache(new RedisCache($cache));
$database = new Database(new MariaDB($db), $cache); $database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace('_project_'.$container->getLabels()["appwrite-project"]); $database->setNamespace('_project_' . $container->getLabels()["appwrite-project"]);
// Get list of all processing executions // Get list of all processing executions
$executions = Authorization::skip(function () use ($database, $container) { $executions = $database->find('executions', [
return $database->find('executions', [ new Query('tagId', Query::TYPE_EQUAL, [$container->getLabels()["appwrite-tag"]]),
new Query('tagId', Query::TYPE_EQUAL, [$container->getLabels()["appwrite-tag"]]), new Query('status', Query::TYPE_EQUAL, ['waiting'])
new Query('status', Query::TYPE_EQUAL, ['waiting']) ]);
]);
});
// Mark all processing executions as failed // Mark all processing executions as failed
foreach ($executions as $execution) { foreach ($executions as $execution) {
$execution->setAttribute('status', 'failed') $execution
->setAttribute('status', 'failed')
->setAttribute('statusCode', 1) ->setAttribute('statusCode', 1)
->setAttribute('stderr', 'Appwrite was shutdown during execution'); ->setAttribute('stderr', 'Appwrite was shutdown during execution');
Authorization::skip(function () use ($database, $execution) { $database->updateDocument('executions', $execution->getId(), $execution);
$database->updateDocument('executions', $execution->getId(), $execution);
});
} }
Console::info('Removed container ' . $container->getName()); Console::info('Removed container ' . $container->getName());
} }
} catch(\Throwable $error) { } catch (\Throwable $error) {
call_user_func($logError, $error, "shutdownError"); logError($error, 'shutdownError');
} }
}; };
$http->on('start', function ($http) use($handleShutdown) { $http->on('start', function ($http) {
Process::signal(SIGINT, function () use ($http, $handleShutdown) { @Process::signal(SIGINT, function () use ($http) {
$handleShutdown(); handleShutdown();
$http->shutdown(); $http->shutdown();
}); });
Process::signal(SIGQUIT, function () use ($http, $handleShutdown) { @Process::signal(SIGQUIT, function () use ($http) {
$handleShutdown(); handleShutdown();
$http->shutdown(); $http->shutdown();
}); });
Process::signal(SIGKILL, function () use ($http, $handleShutdown) { @Process::signal(SIGKILL, function () use ($http) {
$handleShutdown(); handleShutdown();
$http->shutdown(); $http->shutdown();
}); });
Process::signal(SIGTERM, function () use ($http, $handleShutdown) { @Process::signal(SIGTERM, function () use ($http) {
$handleShutdown(); handleShutdown();
$http->shutdown(); $http->shutdown();
}); });
}); });
$http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swooleResponse) use($logError) { $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swooleResponse) {
global $register; global $register;
$request = new Request($swooleRequest); $request = new Request($swooleRequest);
@ -1321,16 +1240,10 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
$app = new App('UTC'); $app = new App('UTC');
$db = $register->get('dbPool')->get(); $db = $register->get('dbPool')->get();
App::setResource('db', function () use (&$db) {
return $db;
});
$redis = $register->get('redisPool')->get(); $redis = $register->get('redisPool')->get();
App::setResource('cache', function () use (&$redis) { App::setResource('db', fn () => $db);
return $redis; App::setResource('cache', fn () => $redis);
});
$projectId = $request->getHeader('x-appwrite-project', ''); $projectId = $request->getHeader('x-appwrite-project', '');
@ -1355,12 +1268,12 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
$database = new Database(new MariaDB($db), $cache); $database = new Database(new MariaDB($db), $cache);
$database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite')); $database->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$database->setNamespace('_project_'.$projectId); $database->setNamespace('_project_' . $projectId);
return $database; return $database;
}, ['db', 'cache']); }, ['db', 'cache']);
App::error(function ($error, $utopia, $request, $response) use ($logError) { App::error(function ($error, $utopia, $request, $response) {
/** @var Exception $error */ /** @var Exception $error */
/** @var Utopia\App $utopia */ /** @var Utopia\App $utopia */
/** @var Utopia\Swoole\Request $request */ /** @var Utopia\Swoole\Request $request */
@ -1371,7 +1284,7 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
} }
$route = $utopia->match($request); $route = $utopia->match($request);
call_user_func($logError, $error, "httpError", $route); logError($error, "httpError", $route);
$version = App::getEnv('_APP_VERSION', 'UNKNOWN'); $version = App::getEnv('_APP_VERSION', 'UNKNOWN');
@ -1410,7 +1323,7 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
try { try {
$app->run($request, $response); $app->run($request, $response);
} catch (Exception $e) { } catch (Exception $e) {
call_user_func($logError, $e, "serverError"); logError($e, "serverError");
$swooleResponse->end('500: Server Error'); $swooleResponse->end('500: Server Error');
} finally { } finally {
/** @var PDOPool $dbPool */ /** @var PDOPool $dbPool */
@ -1424,4 +1337,3 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
}); });
$http->start(); $http->start();

View file

@ -155,6 +155,50 @@ services:
- _APP_DB_PASS - _APP_DB_PASS
- _APP_USAGE_STATS - _APP_USAGE_STATS
appwrite-executor:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: executor
container_name: appwrite-executor
stop_signal: SIGINT
networks:
appwrite:
runtimes:
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- appwrite-functions:/storage/functions:rw
- /tmp:/tmp:rw
depends_on:
- redis
- mariadb
- appwrite
environment:
- _APP_ENV
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_FUNCTIONS_TIMEOUT
- _APP_FUNCTIONS_BUILD_TIMEOUT
- _APP_FUNCTIONS_CONTAINERS
- _APP_FUNCTIONS_RUNTIMES
- _APP_FUNCTIONS_CPUS
- _APP_FUNCTIONS_MEMORY
- _APP_FUNCTIONS_MEMORY_SWAP
- _APP_EXECUTOR_SECRET
- _APP_USAGE_STATS
- _APP_STATSD_HOST
- _APP_STATSD_PORT
- DOCKERHUB_PULL_USERNAME
- DOCKERHUB_PULL_PASSWORD
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
appwrite-worker-database: appwrite-worker-database:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?> image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: worker-database entrypoint: worker-database

View file

@ -359,6 +359,7 @@ services:
- DOCKERHUB_PULL_PASSWORD - DOCKERHUB_PULL_PASSWORD
appwrite-executor: appwrite-executor:
container_name: appwrite-executor
entrypoint: entrypoint:
- php - php
- -e - -e