1
0
Fork 0
mirror of synced 2024-06-26 18:20:43 +12:00

feat: check async execution

This commit is contained in:
Christy Jacob 2022-11-16 09:47:46 +05:30
parent 6bf370a058
commit 4a92db4dc3
9 changed files with 352 additions and 68 deletions

View file

@ -1217,10 +1217,10 @@ App::post('/v1/functions/:functionId/executions')
// TODO revise this later using route label
$usage
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration')); // ms
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
->setParam('executionTime', $execution->getAttribute('duration')); // ms
$roles = Authorization::getRoles();
$isPrivilegedUser = Auth::isPrivilegedUser($roles);

View file

@ -20,37 +20,37 @@ Server::setResource('register', fn() => $register);
Server::setResource('dbForConsole', function (Cache $cache, Registry $register) {
$pools = $register->get('pools');
$dbAdapter = $pools
$database = $pools
->get('console')
->pop()
->getResource()
;
$database = new Database($dbAdapter, $cache);
$database->setNamespace('console');
$adapter = new Database($database, $cache);
$adapter->setNamespace('console');
return $database;
return $adapter;
}, ['cache', 'register']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) {
$args = $message->getPayload() ?? [];
$project = new Document($args['project'] ?? []);
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$pools = $register->get('pools');
$dbAdapter = $pools
$database = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$database = new Database($dbAdapter, $cache);
$database->setNamespace('_' . $project->getInternalId());
$adapter = new Database($database, $cache);
$adapter->setNamespace('_' . $project->getInternalId());
return $database;
return $adapter;
}, ['cache', 'register', 'message', 'dbForConsole']);
Server::setResource('cache', function (Registry $register) {
@ -71,19 +71,26 @@ Server::setResource('cache', function (Registry $register) {
Server::setResource('functions', function (Registry $register) {
$pools = $register->get('pools');
return new Func($pools->get('queue')->pop()->getResource());
return new Func(
$pools
->get('queue')
->pop()
->getResource()
);
}, ['register']);
Server::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);
Server::setResource('statsd', function ($register) {
return $register->get('statsd');
}, ['register']);
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
$workerNumber = 1;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

View file

@ -2,13 +2,13 @@
require_once __DIR__ . '/../worker.php';
use Utopia\Queue;
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\Response\Model\Execution;
use Domnikl\Statsd\Client;
use Executor\Executor;
use Utopia\App;
use Utopia\CLI\Console;
@ -21,14 +21,16 @@ use Utopia\Database\Query;
use Utopia\Database\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Logger\Log;
use Utopia\Queue\Adapter\Swoole;
use Utopia\Queue\Server;
Authorization::disable();
Authorization::setDefaultStatus(false);
global $connection;
global $workerNumber;
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$adapter = new Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
$server = new Server($adapter);
$execute = function (
Document $project,
@ -41,12 +43,14 @@ $execute = function (
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
) use ($executor, $register) {
string $jwt = null,
Client $statsd
) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
var_dump("Deployment ID : ", $deploymentId);
/** Check if deployment exists */
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
@ -163,6 +167,7 @@ $execute = function (
]);
/** Execute function */
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
try {
$executionResponse = $executor->createExecution(
projectId: $project->getId(),
@ -223,7 +228,7 @@ $execute = function (
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
);
@ -243,12 +248,11 @@ $execute = function (
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('projectInternalId', $project->getInternalId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
@ -259,21 +263,25 @@ $execute = function (
}
};
$adapter = new Queue\Adapter\Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME);
$server = new Queue\Server($adapter);
$server->job()
->inject('message')
->inject('dbForProject')
->inject('functions')
->action(function (Message $message, Database $dbForProject, Func $functions) use ($execute) {
$args = $message->getPayload() ?? [];
$type = $args['type'] ?? '';
$events = $args['events'] ?? [];
$project = new Document($args['project'] ?? []);
$user = new Document($args['user'] ?? []);
// Where $payload comes from
$payload = json_encode($args['payload'] ?? []);
->inject('statsd')
->action(function (Message $message, Database $dbForProject, Func $functions, Client $statsd) use ($execute) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
var_dump(json_encode($payload));
$type = $payload['type'] ?? '';
$events = $payload['events'] ?? [];
$project = new Document($payload['project'] ?? []);
$data = $payload['data'] ?? '';
$user = new Document($payload['user'] ?? []);
if ($project->getId() === 'console') {
return;
@ -284,38 +292,31 @@ $server->job()
*/
if (!empty($events)) {
$limit = 30;
$sum = $limit;
$total = 0;
$latestDocument = null;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForProject->find('functions', \array_merge($paginationQueries, [
Query::orderAsc('name')
]));
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = count($results);
$total = $total + $sum;
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($results as $function) {
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
// As event, pass first, most verbose event pattern
call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null);
call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null, $statsd);
Console::success('Triggered function: ' . $events[0]);
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
return;
@ -324,20 +325,18 @@ $server->job()
/**
* Handle Schedule and HTTP execution.
*/
$project = new Document($args['project'] ?? []);
$function = new Document($args['function'] ?? []);
$function = new Document($payload['function'] ?? []);
var_dump($function);
switch ($type) {
case 'http':
$jwt = $args['jwt'] ?? '';
$data = $args['data'] ?? '';
$execution = new Document($args['execution'] ?? []);
$user = new Document($args['user'] ?? []);
// $function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId'));
call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt);
$jwt = $payload['jwt'] ?? '';
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt, $statsd);
break;
case 'schedule':
call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null);
call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null, $statsd);
break;
}
});

2
composer.lock generated
View file

@ -5311,5 +5311,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.1.0"
"plugin-api-version": "2.3.0"
}

21
temp/appwrite.json Normal file
View file

@ -0,0 +1,21 @@
{
"projectId": "6374484424b42c83155c",
"projectName": "Default",
"functions": [
{
"$id": "637449ac7066fa0d8cde",
"name": "My Awesome Function",
"runtime": "node-14.5",
"path": "functions/My Awesome Function",
"entrypoint": "src/index.js",
"ignore": [
"node_modules",
".npm"
],
"execute": [],
"events": [],
"schedule": "",
"timeout": 15
}
]
}

View file

@ -0,0 +1,149 @@
# Created by https://www.toptal.com/developers/gitignore/api/node
# Edit at https://www.toptal.com/developers/gitignore?templates=node
### Node ###
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*
### Node Patch ###
# Serverless Webpack directories
.webpack/
# Optional stylelint cache
# SvelteKit build / generate output
.svelte-kit
# End of https://www.toptal.com/developers/gitignore/api/node
# OS
## Mac
.DS_Store

View file

@ -0,0 +1,47 @@
# My Awesome Function
Welcome to the documentation of this function 👋 We strongly recommend keeping this file in sync with your function's logic to make sure anyone can easily understand your function in the future. If you don't need documentation, you can remove this file.
## 🤖 Documentation
Simple function similar to typical "hello world" example, but instead, we return a simple JSON that tells everyone how awesome developers are.
<!-- Update with your description, for example 'Create Stripe payment and return payment URL' -->
_Example input:_
This function expects no input
<!-- If input is expected, add example -->
_Example output:_
<!-- Update with your expected output -->
```json
{
"areDevelopersAwesome": true
}
```
## 📝 Environment Variables
List of environment variables used by this cloud function:
- **APPWRITE_FUNCTION_ENDPOINT** - Endpoint of Appwrite project
- **APPWRITE_FUNCTION_API_KEY** - Appwrite API Key
<!-- Add your custom environment variables -->
## 🚀 Deployment
There are two ways of deploying the Appwrite function, both having the same results, but each using a different process. We highly recommend using CLI deployment to achieve the best experience.
### Using CLI
Make sure you have [Appwrite CLI](https://appwrite.io/docs/command-line#installation) installed, and you have successfully logged into your Appwrite server. To make sure Appwrite CLI is ready, you can use the command `appwrite client --debug` and it should respond with green text `✓ Success`.
Make sure you are in the same folder as your `appwrite.json` file and run `appwrite deploy function` to deploy your function. You will be prompted to select which functions you want to deploy.
### Manual using tar.gz
Manual deployment has no requirements and uses Appwrite Console to deploy the tag. First, enter the folder of your function. Then, create a tarball of the whole folder and gzip it. After creating `.tar.gz` file, visit Appwrite Console, click on the `Deploy Tag` button and switch to the `Manual` tab. There, set the `entrypoint` to `src/index.js`, and upload the file we just generated.

View file

@ -0,0 +1,15 @@
{
"name": "appwrite-function",
"version": "1.0.0",
"description": "",
"main": "src/index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"node-appwrite": "^8.0.0"
}
}

View file

@ -0,0 +1,46 @@
const sdk = require("node-appwrite");
/*
'req' variable has:
'headers' - object with request headers
'payload' - request body data as a string
'variables' - object with function variables
'res' variable has:
'send(text, status)' - function to return text response. Status code defaults to 200
'json(obj, status)' - function to return JSON response. Status code defaults to 200
If an error is thrown, a response with code 500 will be returned.
*/
module.exports = async function (req, res) {
const client = new sdk.Client();
// You can remove services you don't use
const account = new sdk.Account(client);
const avatars = new sdk.Avatars(client);
const database = new sdk.Databases(client);
const functions = new sdk.Functions(client);
const health = new sdk.Health(client);
const locale = new sdk.Locale(client);
const storage = new sdk.Storage(client);
const teams = new sdk.Teams(client);
const users = new sdk.Users(client);
if (
!req.variables['APPWRITE_FUNCTION_ENDPOINT'] ||
!req.variables['APPWRITE_FUNCTION_API_KEY']
) {
console.warn("Environment variables are not set. Function cannot use Appwrite SDK.");
} else {
client
.setEndpoint(req.variables['APPWRITE_FUNCTION_ENDPOINT'])
.setProject(req.variables['APPWRITE_FUNCTION_PROJECT_ID'])
.setKey(req.variables['APPWRITE_FUNCTION_API_KEY'])
.setSelfSigned(true);
}
res.json({
areDevelopersAwesome: true,
});
};