1
0
Fork 0
mirror of synced 2024-06-30 12:10:51 +12:00

WIP: Log streaming

This commit is contained in:
Matej Bačo 2023-06-13 08:26:36 +02:00
parent be13722ad0
commit 039bfbaf06
8 changed files with 110 additions and 48 deletions

View file

@ -29,7 +29,7 @@ ENV VITE_APPWRITE_GROWTH_ENDPOINT=$VITE_APPWRITE_GROWTH_ENDPOINT
RUN npm ci
RUN npm run build
FROM appwrite/base:0.2.2 as final
FROM meldiron/base:0.2.2 as final
LABEL maintainer="team@appwrite.io"

View file

@ -1423,10 +1423,7 @@ App::post('/v1/functions/:functionId/executions')
path: $path,
method: $method,
headers: $headers,
commands: [
'sh', '-c',
'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '" &>/dev/null &'
]
command: 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '" &>/dev/null &'
);
/** Update execution status */

View file

@ -1,5 +1,6 @@
<?php
use Swoole\Coroutine as Co;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
@ -8,6 +9,7 @@ use Appwrite\Utopia\Response\Model\Deployment;
use Executor\Executor;
use Appwrite\Usage\Stats;
use Appwrite\Vcs\Comment;
use Swoole\Runtime;
use Utopia\Database\DateTime;
use Utopia\App;
use Utopia\CLI\Console;
@ -271,21 +273,49 @@ class BuildsV1 extends Worker
$command = \str_replace('"', '\\"', $command);
$response = $this->executor->createRuntime(
projectId: $project->getId(),
deploymentId: $deployment->getId(),
source: $source,
version: $function->getAttribute('version'),
image: $runtime['image'],
remove: true,
entrypoint: $deployment->getAttribute('entrypoint'),
destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}",
variables: $vars,
commands: [
'sh', '-c',
'tar -zxf /tmp/code.tar.gz -C /mnt/code && helpers/build.sh "' . $command . '"'
],
);
\var_dump("Before");
$response = null;
Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
Co\run(function () use ($project, $deployment, &$response, $source, $function, $runtime, $vars, $command, &$build, $dbForProject) {
Co::join([
Co\go(function () use ($project, $deployment, &$response, &$build, $dbForProject) {
\var_dump("Get logs");
$this->executor->getLogs(
projectId: $project->getId(),
deploymentId: $deployment->getId(),
callback: function ($logs) use (&$response, &$build, $dbForProject) {
\var_dump("Callback");
if($response === null) {
$build = $build->setAttribute('stdout', $build->getAttribute('stdout', '') . $logs);
$build = $dbForProject->updateDocument('builds', $build->getId(), $build);
\var_dump("Writing" . $logs);
}
}
);
}),
Co\go(function () use (&$response, $project, $deployment, $source, $function, $runtime, $vars, $command) {
\var_dump("Create runtime start");
$response = $this->executor->createRuntime(
projectId: $project->getId(),
deploymentId: $deployment->getId(),
source: $source,
version: $function->getAttribute('version'),
image: $runtime['image'],
remove: true,
entrypoint: $deployment->getAttribute('entrypoint'),
destination: APP_STORAGE_BUILDS . "/app-{$project->getId()}",
variables: $vars,
command: 'tar -zxf /tmp/code.tar.gz -C /mnt/code && helpers/build.sh "' . $command . '"'
);
\var_dump("Create runtime End");
})
]);
});
\var_dump("After");
$endTime = DateTime::now();
@ -385,10 +415,6 @@ class BuildsV1 extends Worker
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
$commentId = $deployment->getAttribute('vcsCommentId', '');
if (empty($commentId)) {
return;
}
if (!empty($SHA)) {
$message = match ($status) {
'ready' => 'Build succeeded.',
@ -412,10 +438,13 @@ class BuildsV1 extends Worker
$github->updateCommitStatus($repositoryName, $SHA, $owner, $state, $message, $targetUrl, $name);
}
$comment = new Comment();
$comment->parseComment($github->getComment($owner, $repositoryName, $commentId));
$comment->addBuild($project, $function, $status, $deployment->getId());
$github->updateComment($owner, $repositoryName, $commentId, $comment->generateComment());
if (!empty($commentId)) {
$comment = new Comment();
$comment->parseComment($github->getComment($owner, $repositoryName, $commentId));
$comment->addBuild($project, $function, $status, $deployment->getId());
$github->updateComment($owner, $repositoryName, $commentId, $comment->generateComment());
}
}
public function shutdown(): void

View file

@ -182,10 +182,7 @@ Server::setResource('execute', function () {
path: $path,
method: $method,
headers: $headers,
commands: [
'sh', '-c',
'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '" &>/dev/null &'
]
command: 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '" &>/dev/null &'
);
$status = $executionResponse['statusCode'] >= 400 ? 'failed' : 'completed';

View file

@ -715,7 +715,7 @@ services:
hostname: exc1
<<: *x-logging
stop_signal: SIGINT
image: meldiron/executor:0.3.1
image: meldiron/executor:0.3.2
networks:
- appwrite
- runtimes

View file

@ -74,13 +74,13 @@ class Deployment extends Model
'type' => self::TYPE_STRING,
'description' => 'The build stdout.',
'default' => '',
'example' => 'enabled',
'example' => 'Compiling source files...',
])
->addRule('buildStderr', [
'type' => self::TYPE_STRING,
'description' => 'The build stderr.',
'default' => '',
'example' => 'enabled',
'example' => 'File index.ts not found!',
])
->addRule('buildTime', [
'type' => self::TYPE_INTEGER,

View file

@ -60,7 +60,7 @@ class Rule extends Model
'default' => '',
'example' => 'HTTP challegne failed.',
])
->addRule('renewDate', [
->addRule('renewAt', [
'type' => self::TYPE_DATETIME,
'description' => 'Certificate auto-renewal date in ISO 8601 format.',
'default' => APP_DATABASE_ATTRIBUTE_DATETIME,

View file

@ -57,7 +57,7 @@ class Executor
* @param string $entrypoint
* @param string $destination
* @param array $variables
* @param array $commands
* @param string $command
*/
public function createRuntime(
string $deploymentId,
@ -69,8 +69,7 @@ class Executor
string $entrypoint = '',
string $destination = '',
array $variables = [],
array $commands = null,
array $startCommands = null,
string $command = null,
) {
$runtimeId = "$projectId-$deploymentId";
$route = "/runtimes";
@ -82,8 +81,7 @@ class Executor
'entrypoint' => $entrypoint,
'variables' => $variables,
'remove' => $remove,
'commands' => $commands,
'startCommands' => $startCommands,
'command' => $command,
'cpus' => $this->cpus,
'memory' => $this->memory,
'version' => $version,
@ -102,6 +100,31 @@ class Executor
return $response['body'];
}
/**
*
*
* Listen to realtime logs stream of a runtime
*
* @param string $deploymentId
* @param string $projectId
* @param callable $callback
*/
public function getLogs(
string $deploymentId,
string $projectId,
callable $callback
) {
$timeout = (int) App::getEnv('_APP_FUNCTIONS_BUILD_TIMEOUT', 900);
$runtimeId = "$projectId-$deploymentId";
$route = "/runtimes/{$runtimeId}/logs";
$params = [
'timeout' => $timeout
];
$this->call(self::METHOD_GET, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout, $callback);
}
/**
* Delete Runtime
*
@ -139,7 +162,7 @@ class Executor
* @param string $image
* @param string $source
* @param string $entrypoint
* @param array $commands
* @param string $command
*
* @return array
*/
@ -156,8 +179,7 @@ class Executor
string $path,
string $method,
array $headers,
array $commands = null,
array $startCommands = null
string $command = null,
) {
$headers['host'] = App::getEnv('_APP_DOMAIN', '');
@ -178,8 +200,7 @@ class Executor
'cpus' => $this->cpus,
'memory' => $this->memory,
'version' => $version,
'commands' => $commands,
'startCommands' => $startCommands
'command' => $command,
];
$timeout = (int) App::getEnv('_APP_FUNCTIONS_BUILD_TIMEOUT', 900);
@ -208,7 +229,7 @@ class Executor
* @return array|string
* @throws Exception
*/
public function call(string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15)
public function call(string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15, callable $callback = null)
{
$headers = array_merge($this->headers, $headers);
$ch = curl_init($this->endpoint . $path . (($method == self::METHOD_GET && !empty($params)) ? '?' . http_build_query($params) : ''));
@ -236,8 +257,20 @@ class Executor
unset($headers[$i]);
}
if (isset($callback)) {
$headers[] = 'accept: text/event-stream';
$handleEvent = function ($ch, $data) use ($callback) {
$callback($data);
return \strlen($data);
};
curl_setopt($ch, CURLOPT_WRITEFUNCTION, $handleEvent);
} else {
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
}
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 0);
@ -266,6 +299,12 @@ class Executor
}
$responseBody = curl_exec($ch);
if (isset($callback)) {
curl_close($ch);
return [];
}
$responseType = $responseHeaders['content-type'] ?? '';
$responseStatus = curl_getinfo($ch, CURLINFO_HTTP_CODE);