1
0
Fork 0
mirror of synced 2024-06-03 11:24:48 +12:00

test: connection pool for docker socket

This commit is contained in:
Torsten Dittmann 2022-01-21 15:33:58 +01:00
parent cc7b15ddf1
commit 12db6816f0
3 changed files with 222 additions and 168 deletions

View file

@ -25,6 +25,7 @@ use Utopia\Validator\ArrayList;
use Utopia\Validator\JSON;
use Utopia\Validator\Text;
use Cron\CronExpression;
use Swoole\ConnectionPool;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
use Swoole\Coroutine as Co;
@ -32,6 +33,7 @@ use Utopia\Cache\Cache;
use Utopia\Database\Query;
use Utopia\Orchestration\Adapter\DockerCLI;
use Utopia\Logger\Log;
use Utopia\Registry\Registry;
require_once __DIR__ . '/init.php';
@ -81,18 +83,22 @@ function logError(Throwable $error, string $action, Utopia\Route $route = null)
Console::error('[Error] Line: ' . $error->getLine());
};
try {
$orchestrationPool = new ConnectionPool(function () {
$dockerUser = App::getEnv('DOCKERHUB_PULL_USERNAME', null);
$dockerPass = App::getEnv('DOCKERHUB_PULL_PASSWORD', null);
$dockerEmail = App::getEnv('DOCKERHUB_PULL_EMAIL', null);
$orchestration = new Orchestration(new DockerCLI($dockerUser, $dockerPass));
return $orchestration;
}, 2);
try {
$runtimes = Config::getParam('runtimes');
// Warmup: make sure images are ready to run fast 🚀
Co\run(function () use ($runtimes, $orchestration) {
Co\run(function () use ($runtimes, $orchestrationPool) {
foreach ($runtimes as $runtime) {
go(function () use ($runtime, $orchestration) {
go(function () use ($runtime, $orchestrationPool) {
$orchestration = $orchestrationPool->get();
Console::info('Warming up ' . $runtime['name'] . ' ' . $runtime['version'] . ' environment...');
$response = $orchestration->pull($runtime['image']);
@ -102,6 +108,8 @@ try {
} else {
Console::warning("Failed to Warmup {$runtime['name']} {$runtime['version']}!");
}
$orchestrationPool->put($orchestration);
});
}
});
@ -113,10 +121,12 @@ try {
$activeFunctions->column('key', Swoole\Table::TYPE_STRING, 4096);
$activeFunctions->create();
Co\run(function () use ($orchestration, $activeFunctions) {
Co\run(function () use ($orchestrationPool, $activeFunctions) {
$orchestration = $orchestrationPool->get();
$executionStart = \microtime(true);
$residueList = $orchestration->list(['label' => 'appwrite-type=function']);
$orchestrationPool->put($orchestration);
foreach ($residueList as $value) {
go(fn () => $activeFunctions->set($value->getName(), [
@ -128,7 +138,6 @@ try {
}
$executionEnd = \microtime(true);
Console::info(count($activeFunctions) . ' functions listed in ' . ($executionEnd - $executionStart) . ' seconds');
});
} catch (\Throwable $error) {
@ -137,164 +146,172 @@ try {
function createRuntimeServer(string $functionId, string $projectId, string $tagId, Database $database): void
{
global $orchestration;
global $orchestrationPool;
global $runtimes;
global $activeFunctions;
$function = $database->getDocument('functions', $functionId);
$tag = $database->getDocument('tags', $tagId);
try {
$orchestration = $orchestrationPool->get();
$function = $database->getDocument('functions', $functionId);
$tag = $database->getDocument('tags', $tagId);
if ($tag->getAttribute('buildId') === null) {
throw new Exception('Tag has no buildId');
}
// Grab Build Document
$build = $database->getDocument('builds', $tag->getAttribute('buildId'));
// Check if function isn't already created
$functions = $orchestration->list(['label' => 'appwrite-type=function', 'name' => 'appwrite-function-' . $tag->getId()]);
if (\count($functions) > 0) {
return;
}
// Generate random secret key
$secret = \bin2hex(\random_bytes(16));
// Check if runtime is active
$runtime = $runtimes[$function->getAttribute('runtime', '')] ?? null;
if ($tag->getAttribute('functionId') !== $function->getId()) {
throw new Exception('Tag not found', 404);
}
if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
// Process environment variables
$vars = \array_merge($function->getAttribute('vars', []), [
'APPWRITE_FUNCTION_ID' => $function->getId(),
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_TAG' => $tag->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
'INTERNAL_RUNTIME_KEY' => $secret
]);
$vars = \array_merge($vars, $build->getAttribute('envVars', [])); // for gettng endpoint.
$container = 'appwrite-function-' . $tag->getId();
if ($activeFunctions->exists($container) && !(\substr($activeFunctions->get($container)['status'], 0, 2) === 'Up')) { // Remove container if not online
// If container is online then stop and remove it
try {
$orchestration->remove($container, true);
} catch (Exception $e) {
try {
throw new Exception('Failed to remove container: ' . $e->getMessage());
} catch (Throwable $error) {
logError($error, "createRuntimeServer");
}
if ($tag->getAttribute('buildId') === null) {
throw new Exception('Tag has no buildId');
}
$activeFunctions->del($container);
}
// Grab Build Document
$build = $database->getDocument('builds', $tag->getAttribute('buildId'));
// Check if tag hasn't failed
if ($build->getAttribute('status') === 'failed') {
throw new Exception('Tag build failed, please check your logs.', 500);
}
// Check if function isn't already created
$functions = $orchestration->list(['label' => 'appwrite-type=function', 'name' => 'appwrite-function-' . $tag->getId()]);
// Check if tag is built yet.
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Tag is not built yet', 500);
}
// Grab Tag Files
$tagPath = $build->getAttribute('outputPath', '');
$tagPathTarget = '/tmp/project-' . $projectId . '/' . $build->getId() . '/builtCode/code.tar.gz';
$tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME);
$container = 'appwrite-function-' . $tag->getId();
$device = Storage::getDevice('builds');
if (!\file_exists($tagPathTargetDir)) {
if (!\mkdir($tagPathTargetDir, 0777, true)) {
throw new Exception('Can\'t create directory ' . $tagPathTargetDir);
}
}
if (!\file_exists($tagPathTarget)) {
if (App::getEnv('_APP_STORAGE_DEVICE', Storage::DEVICE_LOCAL) === Storage::DEVICE_LOCAL) {
if (!\copy($tagPath, $tagPathTarget)) {
throw new Exception('Can\'t create temporary code file ' . $tagPathTarget);
}
} else {
$buffer = $device->read($tagPath);
\file_put_contents($tagPathTarget, $buffer);
}
};
/**
* Limit CPU Usage - DONE
* Limit Memory Usage - DONE
* Limit Network Usage
* Limit Storage Usage (//--storage-opt size=120m \)
* Make sure no access to redis, mariadb, influxdb or other system services
* Make sure no access to NFS server / storage volumes
* Access Appwrite REST from internal network for improved performance
*/
if (!$activeFunctions->exists($container)) { // Create contianer if not ready
$executionStart = \microtime(true);
$executionTime = \time();
$orchestration
->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', '1'))
->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256'))
->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256'));
foreach ($vars as $key => $value) {
$vars[$key] = strval($value);
if (\count($functions) > 0) {
return;
}
// Launch runtime server
$id = $orchestration->run(
image: $runtime['image'],
name: $container,
vars: $vars,
labels: [
'appwrite-type' => 'function',
'appwrite-created' => strval($executionTime),
'appwrite-runtime' => $function->getAttribute('runtime', ''),
'appwrite-project' => $projectId,
'appwrite-tag' => $tag->getId(),
],
hostname: $container,
mountFolder: $tagPathTargetDir,
);
// Generate random secret key
$secret = \bin2hex(\random_bytes(16));
if (empty($id)) {
throw new Exception('Failed to create container');
// Check if runtime is active
$runtime = $runtimes[$function->getAttribute('runtime', '')] ?? null;
if ($tag->getAttribute('functionId') !== $function->getId()) {
throw new Exception('Tag not found', 404);
}
// Add to network
$orchestration->networkConnect($container, 'appwrite_runtimes');
if (\is_null($runtime)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
$executionEnd = \microtime(true);
$activeFunctions->set($container, [
'id' => $id,
'name' => $container,
'status' => 'Up ' . \round($executionEnd - $executionStart, 2) . 's',
'key' => $secret,
// Process environment variables
$vars = \array_merge($function->getAttribute('vars', []), [
'APPWRITE_FUNCTION_ID' => $function->getId(),
'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name', ''),
'APPWRITE_FUNCTION_TAG' => $tag->getId(),
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'],
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'],
'APPWRITE_FUNCTION_PROJECT_ID' => $projectId,
'INTERNAL_RUNTIME_KEY' => $secret
]);
Console::info('Runtime Server created in ' . ($executionEnd - $executionStart) . ' seconds');
} else {
Console::info('Runtime server is ready to run');
$vars = \array_merge($vars, $build->getAttribute('envVars', [])); // for gettng endpoint.
$container = 'appwrite-function-' . $tag->getId();
if ($activeFunctions->exists($container) && !(\substr($activeFunctions->get($container)['status'], 0, 2) === 'Up')) { // Remove container if not online
// If container is online then stop and remove it
try {
$orchestration->remove($container, true);
} catch (Exception $e) {
try {
throw new Exception('Failed to remove container: ' . $e->getMessage());
} catch (Throwable $error) {
logError($error, "createRuntimeServer");
}
}
$activeFunctions->del($container);
}
// Check if tag hasn't failed
if ($build->getAttribute('status') === 'failed') {
throw new Exception('Tag build failed, please check your logs.', 500);
}
// Check if tag is built yet.
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Tag is not built yet', 500);
}
// Grab Tag Files
$tagPath = $build->getAttribute('outputPath', '');
$tagPathTarget = '/tmp/project-' . $projectId . '/' . $build->getId() . '/builtCode/code.tar.gz';
$tagPathTargetDir = \pathinfo($tagPathTarget, PATHINFO_DIRNAME);
$container = 'appwrite-function-' . $tag->getId();
$device = Storage::getDevice('builds');
if (!\file_exists($tagPathTargetDir)) {
if (!\mkdir($tagPathTargetDir, 0777, true)) {
throw new Exception('Can\'t create directory ' . $tagPathTargetDir);
}
}
if (!\file_exists($tagPathTarget)) {
if (App::getEnv('_APP_STORAGE_DEVICE', Storage::DEVICE_LOCAL) === Storage::DEVICE_LOCAL) {
if (!\copy($tagPath, $tagPathTarget)) {
throw new Exception('Can\'t create temporary code file ' . $tagPathTarget);
}
} else {
$buffer = $device->read($tagPath);
\file_put_contents($tagPathTarget, $buffer);
}
};
/**
* Limit CPU Usage - DONE
* Limit Memory Usage - DONE
* Limit Network Usage
* Limit Storage Usage (//--storage-opt size=120m \)
* Make sure no access to redis, mariadb, influxdb or other system services
* Make sure no access to NFS server / storage volumes
* Access Appwrite REST from internal network for improved performance
*/
if (!$activeFunctions->exists($container)) { // Create contianer if not ready
$executionStart = \microtime(true);
$executionTime = \time();
$orchestration
->setCpus(App::getEnv('_APP_FUNCTIONS_CPUS', '1'))
->setMemory(App::getEnv('_APP_FUNCTIONS_MEMORY', '256'))
->setSwap(App::getEnv('_APP_FUNCTIONS_MEMORY_SWAP', '256'));
foreach ($vars as $key => $value) {
$vars[$key] = strval($value);
}
// Launch runtime server
$id = $orchestration->run(
image: $runtime['image'],
name: $container,
vars: $vars,
labels: [
'appwrite-type' => 'function',
'appwrite-created' => strval($executionTime),
'appwrite-runtime' => $function->getAttribute('runtime', ''),
'appwrite-project' => $projectId,
'appwrite-tag' => $tag->getId(),
],
hostname: $container,
mountFolder: $tagPathTargetDir,
);
if (empty($id)) {
throw new Exception('Failed to create container');
}
// Add to network
$orchestration->networkConnect($container, 'appwrite_runtimes');
$executionEnd = \microtime(true);
$activeFunctions->set($container, [
'id' => $id,
'name' => $container,
'status' => 'Up ' . \round($executionEnd - $executionStart, 2) . 's',
'key' => $secret,
]);
Console::info('Runtime Server created in ' . ($executionEnd - $executionStart) . ' seconds');
} else {
Console::info('Runtime server is ready to run');
}
} catch (\Throwable $th) {
$orchestrationPool->put($orchestration);
throw $th;
} finally {
$orchestrationPool->put($orchestration);
}
};
@ -410,7 +427,7 @@ function execute(string $trigger, string $projectId, string $executionId, string
$database->updateDocument('tags', $tag->getId(), $tag);
runBuildStage($buildId, $projectId, $database);
runBuildStage($buildId, $projectId);
}
} catch (Exception $e) {
$execution
@ -671,8 +688,11 @@ App::post('/v1/cleanup/function')
->inject('response')
->inject('dbForProject')
->action(
function (string $functionId, Response $response, Database $dbForProject) use ($orchestration) {
function (string $functionId, Response $response, Database $dbForProject) use ($orchestrationPool) {
try {
/** @var Orchestration $orchestration */
$orchestration = $orchestrationPool->get();
// Get function document
$function = $dbForProject->getDocument('functions', $functionId);
@ -712,9 +732,11 @@ App::post('/v1/cleanup/function')
return $response->json(['success' => true]);
} catch (Exception $e) {
logError($e, "cleanupFunction");
$orchestrationPool->put($orchestration);
return $response->json(['error' => $e->getMessage()]);
}
$orchestrationPool->put($orchestration);
}
);
@ -722,8 +744,11 @@ App::post('/v1/cleanup/tag')
->param('tagId', '', new UID(), 'Tag unique ID.')
->inject('response')
->inject('dbForProject')
->action(function (string $tagId, Response $response, Database $dbForProject) use ($orchestration) {
->action(function (string $tagId, Response $response, Database $dbForProject) use ($orchestrationPool) {
try {
/** @var Orchestration $orchestration */
$orchestration = $orchestrationPool->get();
// Get tag document
$tag = $dbForProject->getDocument('tags', $tagId);
@ -752,8 +777,11 @@ App::post('/v1/cleanup/tag')
}
} catch (Exception $e) {
logError($e, "cleanupFunction");
$orchestrationPool->put($orchestration);
return $response->json(['error' => $e->getMessage()]);
}
$orchestrationPool->put($orchestration);
return $response->json(['success' => true]);
});
@ -765,7 +793,8 @@ App::post('/v1/tag')
->inject('response')
->inject('dbForProject')
->inject('projectID')
->action(function (string $functionId, string $tagId, string $userId, Response $response, Database $dbForProject, string $projectID) use ($runtimes) {
->inject('register')
->action(function (string $functionId, string $tagId, string $userId, Response $response, Database $dbForProject, string $projectID, Registry $register) use ($runtimes) {
// Get function document
$function = $dbForProject->getDocument('functions', $functionId);
// Get tag document
@ -826,9 +855,16 @@ App::post('/v1/tag')
}
// Build Code
go(function () use ($dbForProject, $projectID, $tagId, $buildId, $functionId, $function) {
go(function () use ($projectID, $tagId, $buildId, $functionId, $function, $register) {
$db = $register->get('dbPool')->get();
$redis = $register->get('redisPool')->get();
$cache = new Cache(new RedisCache($redis));
$dbForProject = new Database(new MariaDB($db), $cache);
$dbForProject->setDefaultDatabase(App::getEnv('_APP_DB_SCHEMA', 'appwrite'));
$dbForProject->setNamespace('_project_' . $projectID);
// Build Code
runBuildStage($buildId, $projectID, $dbForProject);
runBuildStage($buildId, $projectID);
// Update the schedule
$schedule = $function->getAttribute('schedule', '');
@ -856,6 +892,9 @@ App::post('/v1/tag')
// Deploy Runtime Server
createRuntimeServer($functionId, $projectID, $tagId, $dbForProject);
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
});
if (false === $function) {
@ -867,7 +906,8 @@ App::post('/v1/tag')
App::get('/v1/')
->inject('response')
->action(function (Response $response) {
->action(
function (Response $response) {
$response
->addHeader('Cache-Control', 'no-cache, no-store, must-revalidate')
->addHeader('Expires', '0')
@ -920,12 +960,15 @@ App::post('/v1/build/:buildId') // Start a Build
}
});
function runBuildStage(string $buildId, string $projectID, Database $database): Document
function runBuildStage(string $buildId, string $projectID): Document
{
global $runtimes;
global $orchestration;
global $orchestrationPool;
global $register;
/** @var Orchestration $orchestration */
$orchestration = $orchestrationPool->get();
$buildStdout = '';
$buildStderr = '';
@ -1149,12 +1192,18 @@ function runBuildStage(string $buildId, string $projectID, Database $database):
$build = $database->updateDocument('builds', $buildId, $build);
// also remove the container if it exists
if ($id) {
if (isset($id)) {
$orchestration->remove($id, true);
}
$orchestrationPool->put(null);
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
throw new Exception('Build failed: ' . $e->getMessage());
} finally {
$orchestrationPool->put($orchestration);
$register->get('dbPool')->put($db);
$register->get('redisPool')->put($redis);
}
@ -1166,8 +1215,9 @@ App::setMode(App::MODE_TYPE_PRODUCTION); // Define Mode
$http = new Server("0.0.0.0", 8080);
function handleShutdown() {
global $orchestration;
function handleShutdown()
{
global $orchestrationPool;
global $register;
try {
@ -1175,6 +1225,8 @@ function handleShutdown() {
// Remove all containers.
/** @var Orchestration $orchestration */
$orchestration = $orchestrationPool->get();
$functionsToRemove = $orchestration->list(['label' => 'appwrite-type=function']);
@ -1210,6 +1262,8 @@ function handleShutdown() {
}
} catch (\Throwable $error) {
logError($error, 'shutdownError');
} finally {
$orchestrationPool->put($orchestration);
}
};

View file

@ -72,7 +72,7 @@
"require-dev": {
"appwrite/sdk-generator": "0.17.1",
"phpunit/phpunit": "9.5.10",
"swoole/ide-helper": "4.8.3",
"swoole/ide-helper": "4.8.5",
"textalk/websocket": "1.5.5",
"vimeo/psalm": "4.13.1"
},

14
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "adf8727742248da9d7143546e513f96d",
"content-hash": "cba39f50398d5ae2b121db34c9e4c529",
"packages": [
{
"name": "adhocore/jwt",
@ -5676,16 +5676,16 @@
},
{
"name": "swoole/ide-helper",
"version": "4.8.3",
"version": "4.8.5",
"source": {
"type": "git",
"url": "https://github.com/swoole/ide-helper.git",
"reference": "3ac4971814273889933b871e03b2a6b340e58f79"
"reference": "d03c707d4dc803228e93b4884c72949c4d28e8b8"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/swoole/ide-helper/zipball/3ac4971814273889933b871e03b2a6b340e58f79",
"reference": "3ac4971814273889933b871e03b2a6b340e58f79",
"url": "https://api.github.com/repos/swoole/ide-helper/zipball/d03c707d4dc803228e93b4884c72949c4d28e8b8",
"reference": "d03c707d4dc803228e93b4884c72949c4d28e8b8",
"shasum": ""
},
"type": "library",
@ -5702,7 +5702,7 @@
"description": "IDE help files for Swoole.",
"support": {
"issues": "https://github.com/swoole/ide-helper/issues",
"source": "https://github.com/swoole/ide-helper/tree/4.8.3"
"source": "https://github.com/swoole/ide-helper/tree/4.8.5"
},
"funding": [
{
@ -5714,7 +5714,7 @@
"type": "github"
}
],
"time": "2021-12-01T08:11:40+00:00"
"time": "2021-12-24T22:44:20+00:00"
},
{
"name": "symfony/console",