Executor now marks all processing executions as failed on shutdown
Executor now marks all processing executions as failed on shutdown
This commit is contained in:
parent
7649a2c677
commit
01ecbe60e4
1 changed files with 66 additions and 16 deletions
|
@ -29,6 +29,7 @@ use Utopia\Storage\Device\Local;
|
|||
use Utopia\Storage\Storage;
|
||||
use Swoole\Coroutine as Co;
|
||||
use Utopia\Cache\Cache;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Orchestration\Adapter\DockerCLI;
|
||||
|
||||
require_once __DIR__ . '/init.php';
|
||||
|
@ -132,10 +133,12 @@ App::post('/v1/cleanup/function')
|
|||
global $orchestration;
|
||||
|
||||
try {
|
||||
// Get function document
|
||||
$function = Authorization::skip(function () use ($dbForInternal, $functionId) {
|
||||
return $dbForInternal->getDocument('functions', $functionId);
|
||||
});
|
||||
|
||||
// Check if function exists
|
||||
if ($function->isEmpty()) {
|
||||
throw new Exception('Function not found', 404);
|
||||
}
|
||||
|
@ -188,15 +191,18 @@ App::post('/v1/cleanup/tag')
|
|||
global $orchestration;
|
||||
|
||||
try {
|
||||
// Get tag document
|
||||
$tag = Authorization::skip(function () use ($dbForInternal, $tagId) {
|
||||
return $dbForInternal->getDocument('tags', $tagId);
|
||||
});
|
||||
|
||||
// Check if tag exists
|
||||
if ($tag->isEmpty()) {
|
||||
throw new Exception('Tag not found', 404);
|
||||
}
|
||||
|
||||
try {
|
||||
// Remove the container of the tag
|
||||
$orchestration->remove('appwrite-function-' . $tag['$id'], true);
|
||||
Console::info('Removed container for tag ' . $tag['$id']);
|
||||
} catch (Exception $e) {
|
||||
|
@ -217,14 +223,17 @@ App::post('/v1/tag')
|
|||
->inject('dbForInternal')
|
||||
->inject('projectID')
|
||||
->action(function ($functionId, $tagId, $response, $dbForInternal, $projectID) {
|
||||
$function = Authorization::skip(function() use ($functionId, $dbForInternal) {
|
||||
// Get function document
|
||||
$function = Authorization::skip(function () use ($functionId, $dbForInternal) {
|
||||
return $dbForInternal->getDocument('functions', $functionId);
|
||||
});
|
||||
|
||||
$tag = Authorization::skip(function() use ($tagId, $dbForInternal) {
|
||||
// Get tag document
|
||||
$tag = Authorization::skip(function () use ($tagId, $dbForInternal) {
|
||||
return $dbForInternal->getDocument('tags', $tagId);
|
||||
});
|
||||
|
||||
// Check if both documents exist
|
||||
if ($function->isEmpty()) {
|
||||
throw new Exception('Function not found', 404);
|
||||
}
|
||||
|
@ -233,11 +242,13 @@ App::post('/v1/tag')
|
|||
throw new Exception('Tag not found', 404);
|
||||
}
|
||||
|
||||
// Update the schedule
|
||||
$schedule = $function->getAttribute('schedule', '');
|
||||
$cron = (empty($function->getAttribute('tag')) && !empty($schedule)) ? new CronExpression($schedule) : null;
|
||||
$next = (empty($function->getAttribute('tag')) && !empty($schedule)) ? $cron->getNextRunDate()->format('U') : 0;
|
||||
|
||||
$function = Authorization::skip(function() use ($function, $dbForInternal, $tag, $next) {
|
||||
// Update the function document setting the tag as the active one
|
||||
$function = Authorization::skip(function () use ($function, $dbForInternal, $tag, $next) {
|
||||
return $function = $dbForInternal->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
|
||||
'tag' => $tag->getId(),
|
||||
'scheduleNext' => (int)$next,
|
||||
|
@ -318,6 +329,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
$tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME);
|
||||
$container = 'build-stage-' . $tag->getId();
|
||||
|
||||
// Perform various checks
|
||||
if (!\is_readable($tagPath)) {
|
||||
throw new Exception('Code is not readable: ' . $tag->getAttribute('path', ''));
|
||||
}
|
||||
|
@ -334,6 +346,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
}
|
||||
}
|
||||
|
||||
// Set build container's environment variables
|
||||
$vars = \array_merge($function->getAttribute('vars', []), [
|
||||
'APPWRITE_FUNCTION_ID' => $function->getId(),
|
||||
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
|
||||
|
@ -344,6 +357,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
'APPWRITE_ENTRYPOINT_NAME' => $tag->getAttribute('entrypoint')
|
||||
]);
|
||||
|
||||
// Start tracking time
|
||||
$buildStart = \microtime(true);
|
||||
$buildTime = \time();
|
||||
|
||||
|
@ -361,6 +375,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
}
|
||||
};
|
||||
|
||||
// Launch build container
|
||||
$id = $orchestration->run(
|
||||
image: $runtime['base'],
|
||||
name: $container,
|
||||
|
@ -370,6 +385,8 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($buildTime),
|
||||
'appwrite-runtime' => $function->getAttribute('runtime', ''),
|
||||
'appwrite-project' => $projectID,
|
||||
'appwrite-tag' => $tagID
|
||||
],
|
||||
command: [
|
||||
'tail',
|
||||
|
@ -383,6 +400,7 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
]
|
||||
);
|
||||
|
||||
// Extract user code into build container
|
||||
$untarStdout = '';
|
||||
$untarStderr = '';
|
||||
|
||||
|
@ -459,9 +477,9 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
}
|
||||
|
||||
$tag->setAttribute('buildPath', $path)
|
||||
->setAttribute('status', 'ready')
|
||||
->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
|
||||
->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096)));
|
||||
->setAttribute('status', 'ready')
|
||||
->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
|
||||
->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096)));
|
||||
|
||||
// Update tag with built code attribute
|
||||
$tag = Authorization::skip(function () use ($tag, $tagID, $database) {
|
||||
|
@ -475,8 +493,8 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
Console::error('Tag build failed: ' . $e->getMessage());
|
||||
|
||||
$tag->setAttribute('status', 'failed')
|
||||
->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
|
||||
->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096)));
|
||||
->setAttribute('buildStdout', \utf8_encode(\mb_substr($buildStdout, -4096)))
|
||||
->setAttribute('buildStderr', \utf8_encode(\mb_substr($buildStderr, -4096)));
|
||||
|
||||
Authorization::skip(function () use ($tag, $tagID, $database) {
|
||||
return $database->updateDocument('tags', $tagID, $tag);
|
||||
|
@ -493,7 +511,6 @@ function runBuildStage(string $tagID, Document $function, string $projectID, Dat
|
|||
|
||||
function createRuntimeServer(string $functionId, string $projectId, Document $tag, Database $database)
|
||||
{
|
||||
global $register;
|
||||
global $orchestration;
|
||||
global $runtimes;
|
||||
global $activeFunctions;
|
||||
|
@ -604,6 +621,7 @@ function createRuntimeServer(string $functionId, string $projectId, Document $ta
|
|||
$vars[$key] = strval($value);
|
||||
}
|
||||
|
||||
// Launch runtime server
|
||||
$id = $orchestration->run(
|
||||
image: $runtime['image'],
|
||||
name: $container,
|
||||
|
@ -612,6 +630,8 @@ function createRuntimeServer(string $functionId, string $projectId, Document $ta
|
|||
'appwrite-type' => 'function',
|
||||
'appwrite-created' => strval($executionTime),
|
||||
'appwrite-runtime' => $function->getAttribute('runtime', ''),
|
||||
'appwrite-project' => $projectId,
|
||||
'appwrite-tag' => $tag->getId(),
|
||||
],
|
||||
hostname: $container,
|
||||
mountFolder: $tagPathTargetDir,
|
||||
|
@ -910,7 +930,7 @@ function execute(string $trigger, string $projectId, string $executionId, string
|
|||
roles: $target['roles']
|
||||
);
|
||||
|
||||
if(App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') {
|
||||
$statsd = $register->get('statsd');
|
||||
|
||||
$usage = new Stats($statsd);
|
||||
|
@ -923,8 +943,7 @@ function execute(string $trigger, string $projectId, string $executionId, string
|
|||
->setParam('functionExecutionTime', $executionTime * 1000) // ms
|
||||
->setParam('networkRequestSize', 0)
|
||||
->setParam('networkResponseSize', 0)
|
||||
->submit()
|
||||
;
|
||||
->submit();
|
||||
|
||||
$usage->submit();
|
||||
}
|
||||
|
@ -998,11 +1017,11 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
|
|||
return $swooleResponse->end('401: Authentication Error');
|
||||
}
|
||||
|
||||
App::setResource('dbForInternal', function($db, $cache) use ($projectId) {
|
||||
App::setResource('dbForInternal', function ($db, $cache) use ($projectId) {
|
||||
$cache = new Cache(new RedisCache($cache));
|
||||
|
||||
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_'.$projectId.'_internal');
|
||||
$database->setNamespace('project_' . $projectId . '_internal');
|
||||
|
||||
return $database;
|
||||
}, ['db', 'cache']);
|
||||
|
@ -1073,7 +1092,7 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo
|
|||
/** @var PDOPool $dbPool */
|
||||
$dbPool = $register->get('dbPool');
|
||||
$dbPool->put($db);
|
||||
|
||||
|
||||
/** @var RedisPool $redisPool */
|
||||
$redisPool = $register->get('redisPool');
|
||||
$redisPool->put($redis);
|
||||
|
@ -1089,11 +1108,42 @@ function handleShutdown()
|
|||
// Remove all containers.
|
||||
global $orchestration;
|
||||
|
||||
global $register;
|
||||
|
||||
$functionsToRemove = $orchestration->list(['label' => 'appwrite-type=function']);
|
||||
|
||||
foreach ($functionsToRemove as $container) {
|
||||
try {
|
||||
$orchestration->remove($container->getId(), true);
|
||||
|
||||
// Get a database instance
|
||||
$db = $register->get('dbPool')->get();
|
||||
$cache = $register->get('redisPool')->get();
|
||||
|
||||
$cache = new Cache(new RedisCache($cache));
|
||||
|
||||
$database = new Database(new MariaDB($db), $cache);
|
||||
$database->setNamespace('project_'.$container->getLabels()["appwrite-project"].'_internal');
|
||||
|
||||
// Get list of all processing executions
|
||||
$executions = Authorization::skip(function () use ($database, $container) {
|
||||
return $database->find('executions', [
|
||||
new Query('tagId', Query::TYPE_EQUAL, [$container->getLabels()["appwrite-tag"]]),
|
||||
new Query('status', Query::TYPE_EQUAL, ['waiting'])
|
||||
]);
|
||||
});
|
||||
|
||||
// Mark all processing executions as failed
|
||||
foreach ($executions as $execution) {
|
||||
$execution->setAttribute('status', 'failed')
|
||||
->setAttribute('exitCode', 1)
|
||||
->setAttribute('stderr', 'Appwrite was shutdown during execution');
|
||||
|
||||
Authorization::skip(function () use ($database, $execution) {
|
||||
$database->updateDocument('executions', $execution->getId(), $execution);
|
||||
});
|
||||
}
|
||||
|
||||
Console::info('Removed container ' . $container->getName());
|
||||
} catch (Exception $e) {
|
||||
Console::error('Failed to remove container: ' . $container->getName());
|
||||
|
|
Loading…
Reference in a new issue