2020-05-05 01:34:31 +12:00
< ? php
2020-07-17 00:04:06 +12:00
2022-02-06 08:49:57 +13:00
use Appwrite\Event\Event ;
2022-04-18 08:34:32 +12:00
use Appwrite\Event\Func ;
2022-02-06 08:49:57 +13:00
use Appwrite\Messaging\Adapter\Realtime ;
2021-06-12 02:20:18 +12:00
use Appwrite\Resque\Worker ;
2022-08-09 18:28:38 +12:00
use Appwrite\Usage\Stats ;
2022-02-06 08:49:57 +13:00
use Appwrite\Utopia\Response\Model\Execution ;
2021-01-17 12:38:13 +13:00
use Cron\CronExpression ;
2022-02-06 08:49:57 +13:00
use Executor\Executor ;
2020-07-20 02:43:59 +12:00
use Utopia\App ;
2020-05-10 10:12:00 +12:00
use Utopia\CLI\Console ;
2020-05-05 01:34:31 +12:00
use Utopia\Config\Config ;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Database ;
2022-07-13 01:32:39 +12:00
use Utopia\Database\DateTime ;
2021-05-05 09:45:41 +12:00
use Utopia\Database\Document ;
2022-12-15 04:42:25 +13:00
use Utopia\Database\Helpers\ID ;
2022-12-15 05:04:06 +13:00
use Utopia\Database\Helpers\Permission ;
2022-08-11 01:43:05 +12:00
use Utopia\Database\Query ;
2022-12-15 05:04:06 +13:00
use Utopia\Database\Helpers\Role ;
2020-05-09 18:26:18 +12:00
2022-04-14 00:39:31 +12:00
require_once __DIR__ . '/../init.php' ;
2020-05-10 04:39:50 +12:00
2021-04-15 01:07:26 +12:00
Console :: title ( 'Functions V1 Worker' );
2021-09-01 21:09:04 +12:00
Console :: success ( APP_NAME . ' functions worker v1 has started' );
2020-05-10 04:39:50 +12:00
2021-06-12 02:20:18 +12:00
class FunctionsV1 extends Worker
2020-05-05 01:34:31 +12:00
{
2022-04-14 00:39:31 +12:00
private ? Executor $executor = null ;
2021-09-01 21:09:04 +12:00
public array $args = [];
public array $allowed = [];
2020-07-22 08:10:31 +12:00
2022-04-14 00:39:31 +12:00
public function getName () : string
{
2021-11-24 03:24:25 +13:00
return " functions " ;
}
2021-06-12 02:20:18 +12:00
public function init () : void
2020-05-05 01:34:31 +12:00
{
2022-04-14 04:15:25 +12:00
$this -> executor = new Executor ( App :: getEnv ( '_APP_EXECUTOR_HOST' ));
2020-05-05 01:34:31 +12:00
}
2021-06-12 02:20:18 +12:00
public function run () : void
2020-05-05 01:34:31 +12:00
{
2022-04-18 08:34:32 +12:00
$type = $this -> args [ 'type' ] ? ? '' ;
$events = $this -> args [ 'events' ] ? ? [];
2022-04-14 00:39:31 +12:00
$project = new Document ( $this -> args [ 'project' ] ? ? []);
2022-04-18 08:34:32 +12:00
$user = new Document ( $this -> args [ 'user' ] ? ? []);
$payload = json_encode ( $this -> args [ 'payload' ] ? ? []);
2022-04-14 00:39:31 +12:00
2022-06-20 21:22:53 +12:00
if ( $project -> getId () === 'console' ) {
return ;
}
2022-06-20 19:37:00 +12:00
$database = $this -> getProjectDB ( $project -> getId ());
2020-07-17 00:04:06 +12:00
2022-04-18 08:34:32 +12:00
/**
* Handle Event execution .
*/
if ( ! empty ( $events )) {
$limit = 30 ;
$sum = 30 ;
$offset = 0 ;
$functions = [];
/** @var Document[] $functions */
while ( $sum >= $limit ) {
2022-08-12 11:53:52 +12:00
$functions = $database -> find ( 'functions' , [
Query :: limit ( $limit ),
Query :: offset ( $offset ),
Query :: orderAsc ( 'name' ),
]);
2022-04-18 08:34:32 +12:00
$sum = \count ( $functions );
$offset = $offset + $limit ;
Console :: log ( 'Fetched ' . $sum . ' functions...' );
foreach ( $functions as $function ) {
if ( ! array_intersect ( $events , $function -> getAttribute ( 'events' , []))) {
continue ;
2020-08-05 17:18:45 +12:00
}
2022-04-18 08:34:32 +12:00
Console :: success ( 'Iterating function: ' . $function -> getAttribute ( 'name' ));
$this -> execute (
2022-04-19 04:21:45 +12:00
project : $project ,
2022-04-18 08:34:32 +12:00
function : $function ,
dbForProject : $database ,
trigger : 'event' ,
2022-05-10 00:36:29 +12:00
// Pass first, most verbose event pattern
2022-04-18 08:34:32 +12:00
event : $events [ 0 ],
eventData : $payload ,
2022-04-19 04:21:45 +12:00
user : $user
2022-04-18 08:34:32 +12:00
);
Console :: success ( 'Triggered function: ' . $events [ 0 ]);
2020-08-05 08:09:01 +12:00
}
2022-04-18 08:34:32 +12:00
}
2022-02-06 08:49:57 +13:00
2022-04-18 08:34:32 +12:00
return ;
}
/**
* Handle Schedule and HTTP execution .
*/
$user = new Document ( $this -> args [ 'user' ] ? ? []);
$project = new Document ( $this -> args [ 'project' ] ? ? []);
$execution = new Document ( $this -> args [ 'execution' ] ? ? []);
2022-05-19 02:31:29 +12:00
$function = new Document ( $this -> args [ 'function' ] ? ? []);
2022-04-18 08:34:32 +12:00
switch ( $type ) {
2022-04-14 00:39:31 +12:00
case 'http' :
2022-04-18 08:34:32 +12:00
$jwt = $this -> args [ 'jwt' ] ? ? '' ;
$data = $this -> args [ 'data' ] ? ? '' ;
2022-06-20 21:22:53 +12:00
$function = $database -> getDocument ( 'functions' , $execution -> getAttribute ( 'functionId' ));
2022-04-14 00:39:31 +12:00
$this -> execute (
2022-04-19 04:21:45 +12:00
project : $project ,
2022-04-14 00:39:31 +12:00
function : $function ,
dbForProject : $database ,
executionId : $execution -> getId (),
2022-04-18 08:34:32 +12:00
trigger : 'http' ,
2022-04-14 00:39:31 +12:00
data : $data ,
2022-04-19 04:21:45 +12:00
user : $user ,
2022-04-14 00:39:31 +12:00
jwt : $jwt
);
2022-02-06 08:49:57 +13:00
2020-08-04 17:23:38 +12:00
break ;
case 'schedule' :
2022-09-12 00:20:26 +12:00
$functionOriginal = $function ;
2020-10-02 21:25:57 +13:00
/*
2020-11-03 19:53:32 +13:00
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don ' t reschedule
* If status not equal to play skip task
* 3. Check next run date , update task and add new job at the given date
* 4. Execute task ( set optional timeout )
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
2021-01-17 12:38:13 +13:00
// Reschedule
2022-06-20 21:22:53 +12:00
$function = $database -> getDocument ( 'functions' , $function -> getId ());
2022-05-19 02:31:29 +12:00
2021-05-05 09:45:41 +12:00
if ( empty ( $function -> getId ())) {
2022-04-14 00:39:31 +12:00
throw new Exception ( 'Function not found (' . $function -> getId () . ')' );
2021-01-17 12:38:13 +13:00
}
2022-09-12 00:20:26 +12:00
if ( $functionOriginal -> getAttribute ( 'schedule' ) !== $function -> getAttribute ( 'schedule' )) { // Schedule has changed from previous run, ignore this run.
return ;
}
if ( $functionOriginal -> getAttribute ( 'scheduleUpdatedAt' ) !== $function -> getAttribute ( 'scheduleUpdatedAt' )) { // Double execution due to rapid cron changes, ignore this run.
2021-01-17 12:38:13 +13:00
return ;
}
2021-02-22 10:37:22 +13:00
$cron = new CronExpression ( $function -> getAttribute ( 'schedule' ));
2022-07-14 02:02:49 +12:00
$next = DateTime :: format ( $cron -> getNextRunDate ());
2021-01-17 12:38:13 +13:00
2022-09-12 00:20:26 +12:00
$function = $function
2021-01-17 12:38:13 +13:00
-> setAttribute ( 'scheduleNext' , $next )
2022-07-14 02:02:49 +12:00
-> setAttribute ( 'schedulePrevious' , DateTime :: now ());
2021-01-17 12:38:13 +13:00
2022-06-20 21:22:53 +12:00
$function = $database -> updateDocument (
2022-04-14 00:39:31 +12:00
'functions' ,
$function -> getId (),
2022-09-12 00:20:26 +12:00
$function
2022-06-20 21:22:53 +12:00
);
2021-01-17 12:38:13 +13:00
2022-04-18 08:34:32 +12:00
$reschedule = new Func ();
$reschedule
-> setFunction ( $function )
-> setType ( 'schedule' )
-> setUser ( $user )
2022-07-07 21:54:46 +12:00
-> setProject ( $project )
2022-07-13 01:32:39 +12:00
-> schedule ( new \DateTime ( $next ));
2022-07-07 21:54:46 +12:00
;
2020-07-17 00:04:06 +12:00
2021-09-01 21:09:04 +12:00
$this -> execute (
2022-04-19 04:21:45 +12:00
project : $project ,
2021-09-01 21:09:04 +12:00
function : $function ,
2022-02-06 08:49:57 +13:00
dbForProject : $database ,
2022-04-18 08:34:32 +12:00
trigger : 'schedule'
2021-09-01 21:09:04 +12:00
);
2022-02-06 08:49:57 +13:00
2020-08-04 17:23:38 +12:00
break ;
2020-07-17 00:04:06 +12:00
}
2020-08-04 17:23:38 +12:00
}
2022-02-06 08:49:57 +13:00
private function execute (
2022-04-19 04:21:45 +12:00
Document $project ,
2022-02-06 08:49:57 +13:00
Document $function ,
Database $dbForProject ,
string $trigger ,
2022-04-18 08:34:32 +12:00
string $executionId = null ,
string $event = null ,
string $eventData = null ,
string $data = null ,
2022-04-19 04:21:45 +12:00
? Document $user = null ,
2022-04-18 08:34:32 +12:00
string $jwt = null
2022-02-06 08:49:57 +13:00
) {
2022-05-19 02:47:44 +12:00
$user ? ? = new Document ();
2022-02-06 08:49:57 +13:00
$functionId = $function -> getId ();
$deploymentId = $function -> getAttribute ( 'deployment' , '' );
/** Check if deployment exists */
2022-06-20 21:22:53 +12:00
$deployment = $dbForProject -> getDocument ( 'deployments' , $deploymentId );
2022-02-06 08:49:57 +13:00
if ( $deployment -> getAttribute ( 'resourceId' ) !== $functionId ) {
throw new Exception ( 'Deployment not found. Create deployment before trying to execute a function' , 404 );
}
if ( $deployment -> isEmpty ()) {
throw new Exception ( 'Deployment not found. Create deployment before trying to execute a function' , 404 );
}
/** Check if build has exists */
2022-06-20 21:22:53 +12:00
$build = $dbForProject -> getDocument ( 'builds' , $deployment -> getAttribute ( 'buildId' , '' ));
2022-02-06 08:49:57 +13:00
if ( $build -> isEmpty ()) {
throw new Exception ( 'Build not found' , 404 );
}
if ( $build -> getAttribute ( 'status' ) !== 'ready' ) {
throw new Exception ( 'Build not ready' , 400 );
}
/** Check if runtime is supported */
$runtimes = Config :: getParam ( 'runtimes' , []);
2022-04-14 00:39:31 +12:00
if ( ! \array_key_exists ( $function -> getAttribute ( 'runtime' ), $runtimes )) {
2022-02-06 08:49:57 +13:00
throw new Exception ( 'Runtime "' . $function -> getAttribute ( 'runtime' , '' ) . '" is not supported' , 400 );
2021-01-13 18:57:15 +13:00
}
2020-07-21 22:33:23 +12:00
2022-04-14 00:39:31 +12:00
$runtime = $runtimes [ $function -> getAttribute ( 'runtime' )];
2022-02-06 08:49:57 +13:00
/** Create execution or update execution status */
2022-06-20 21:22:53 +12:00
$execution = $dbForProject -> getDocument ( 'executions' , $executionId ? ? '' );
if ( $execution -> isEmpty ()) {
2022-08-15 02:22:38 +12:00
$executionId = ID :: unique ();
2022-06-20 21:22:53 +12:00
$execution = $dbForProject -> createDocument ( 'executions' , new Document ([
'$id' => $executionId ,
2022-08-15 23:24:31 +12:00
'$permissions' => $user -> isEmpty () ? [] : [ Permission :: read ( Role :: user ( $user -> getId ()))],
2022-06-20 21:22:53 +12:00
'functionId' => $functionId ,
'deploymentId' => $deploymentId ,
'trigger' => $trigger ,
'status' => 'waiting' ,
'statusCode' => 0 ,
'response' => '' ,
'stderr' => '' ,
2022-09-10 00:02:04 +12:00
'duration' => 0.0 ,
2022-06-20 21:22:53 +12:00
'search' => implode ( ' ' , [ $functionId , $executionId ]),
]));
2022-02-06 08:49:57 +13:00
if ( $execution -> isEmpty ()) {
2022-06-20 21:22:53 +12:00
throw new Exception ( 'Failed to create or read execution' );
2022-02-06 08:49:57 +13:00
}
2022-06-20 21:22:53 +12:00
}
$execution -> setAttribute ( 'status' , 'processing' );
$execution = $dbForProject -> updateDocument ( 'executions' , $executionId , $execution );
2022-02-06 08:49:57 +13:00
2022-08-31 00:30:52 +12:00
$vars = array_reduce ( $function [ 'vars' ] ? ? [], function ( array $carry , Document $var ) {
$carry [ $var -> getAttribute ( 'key' )] = $var -> getAttribute ( 'value' );
return $carry ;
}, []);
2022-08-11 01:43:05 +12:00
2022-02-06 08:49:57 +13:00
/** Collect environment variables */
2022-08-11 01:43:05 +12:00
$vars = \array_merge ( $vars , [
2022-09-30 22:52:35 +13:00
'APPWRITE_FUNCTION_ID' => $functionId ,
'APPWRITE_FUNCTION_NAME' => $function -> getAttribute ( 'name' ),
'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId ,
'APPWRITE_FUNCTION_TRIGGER' => $trigger ,
'APPWRITE_FUNCTION_PROJECT_ID' => $project -> getId (),
2022-09-19 23:58:41 +12:00
'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime [ 'name' ] ? ? '' ,
'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime [ 'version' ] ? ? '' ,
'APPWRITE_FUNCTION_EVENT' => $event ? ? '' ,
'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ? ? '' ,
'APPWRITE_FUNCTION_DATA' => $data ? ? '' ,
'APPWRITE_FUNCTION_USER_ID' => $user -> getId () ? ? '' ,
'APPWRITE_FUNCTION_JWT' => $jwt ? ? '' ,
2022-08-11 01:43:05 +12:00
]);
2022-02-06 08:49:57 +13:00
/** Execute function */
2022-02-16 06:39:03 +13:00
try {
$executionResponse = $this -> executor -> createExecution (
2022-04-19 04:21:45 +12:00
projectId : $project -> getId (),
2022-02-16 06:39:03 +13:00
deploymentId : $deploymentId ,
path : $build -> getAttribute ( 'outputPath' , '' ),
vars : $vars ,
entrypoint : $deployment -> getAttribute ( 'entrypoint' , '' ),
2022-05-19 02:47:44 +12:00
data : $vars [ 'APPWRITE_FUNCTION_DATA' ] ? ? '' ,
2022-02-16 06:39:03 +13:00
runtime : $function -> getAttribute ( 'runtime' , '' ),
timeout : $function -> getAttribute ( 'timeout' , 0 ),
baseImage : $runtime [ 'image' ]
);
/** Update execution status */
2022-04-14 00:39:31 +12:00
$execution
-> setAttribute ( 'status' , $executionResponse [ 'status' ])
-> setAttribute ( 'statusCode' , $executionResponse [ 'statusCode' ])
2022-05-09 01:11:14 +12:00
-> setAttribute ( 'response' , $executionResponse [ 'response' ])
2022-08-10 19:19:07 +12:00
-> setAttribute ( 'stdout' , $executionResponse [ 'stdout' ])
2022-04-14 00:39:31 +12:00
-> setAttribute ( 'stderr' , $executionResponse [ 'stderr' ])
2022-09-10 00:02:04 +12:00
-> setAttribute ( 'duration' , $executionResponse [ 'duration' ]);
2022-02-16 06:39:03 +13:00
} catch ( \Throwable $th ) {
2022-07-13 01:32:39 +12:00
$interval = ( new \DateTime ()) -> diff ( new \DateTime ( $execution -> getCreatedAt ()));
2022-04-14 00:39:31 +12:00
$execution
2022-09-10 00:02:04 +12:00
-> setAttribute ( 'duration' , ( float ) $interval -> format ( '%s.%f' ))
2022-04-14 00:39:31 +12:00
-> setAttribute ( 'status' , 'failed' )
-> setAttribute ( 'statusCode' , $th -> getCode ())
-> setAttribute ( 'stderr' , $th -> getMessage ());
2022-02-16 06:39:03 +13:00
Console :: error ( $th -> getMessage ());
}
2022-06-20 21:22:53 +12:00
$execution = $dbForProject -> updateDocument ( 'executions' , $executionId , $execution );
2022-02-06 08:49:57 +13:00
/** Trigger Webhook */
$executionModel = new Execution ();
$executionUpdate = new Event ( Event :: WEBHOOK_QUEUE_NAME , Event :: WEBHOOK_CLASS_NAME );
$executionUpdate
2022-04-19 04:21:45 +12:00
-> setProject ( $project )
-> setUser ( $user )
-> setEvent ( 'functions.[functionId].executions.[executionId].update' )
-> setParam ( 'functionId' , $function -> getId ())
-> setParam ( 'executionId' , $execution -> getId ())
-> setPayload ( $execution -> getArrayCopy ( array_keys ( $executionModel -> getRules ())))
-> trigger ();
/** Trigger Functions */
$executionUpdate
-> setClass ( Event :: FUNCTIONS_CLASS_NAME )
-> setQueue ( Event :: FUNCTIONS_QUEUE_NAME )
2022-04-14 00:39:31 +12:00
-> trigger ();
2022-02-06 08:49:57 +13:00
/** Trigger realtime event */
2022-04-19 04:21:45 +12:00
$allEvents = Event :: generateEvents ( 'functions.[functionId].executions.[executionId].update' , [
'functionId' => $function -> getId (),
'executionId' => $execution -> getId ()
]);
2022-05-10 00:36:29 +12:00
$target = Realtime :: fromPayload (
// Pass first, most verbose event pattern
event : $allEvents [ 0 ],
payload : $execution
);
2022-03-01 01:24:35 +13:00
Realtime :: send (
projectId : 'console' ,
payload : $execution -> getArrayCopy (),
2022-04-19 04:21:45 +12:00
events : $allEvents ,
2022-03-01 01:24:35 +13:00
channels : $target [ 'channels' ],
roles : $target [ 'roles' ]
);
2022-02-06 08:49:57 +13:00
Realtime :: send (
2022-04-19 04:21:45 +12:00
projectId : $project -> getId (),
2022-02-06 08:49:57 +13:00
payload : $execution -> getArrayCopy (),
2022-04-19 04:21:45 +12:00
events : $allEvents ,
2022-02-06 08:49:57 +13:00
channels : $target [ 'channels' ],
roles : $target [ 'roles' ]
);
/** Update usage stats */
global $register ;
if ( App :: getEnv ( '_APP_USAGE_STATS' , 'enabled' ) === 'enabled' ) {
$statsd = $register -> get ( 'statsd' );
$usage = new Stats ( $statsd );
$usage
2022-04-19 04:21:45 +12:00
-> setParam ( 'projectId' , $project -> getId ())
2022-11-15 22:33:07 +13:00
-> setParam ( 'projectInternalId' , $project -> getInternalId ())
2022-02-06 08:49:57 +13:00
-> setParam ( 'functionId' , $function -> getId ())
2022-08-11 17:25:39 +12:00
-> setParam ( 'executions.{scope}.compute' , 1 )
-> setParam ( 'executionStatus' , $execution -> getAttribute ( 'status' , '' ))
2022-09-10 00:02:04 +12:00
-> setParam ( 'executionTime' , $execution -> getAttribute ( 'duration' ))
2022-02-06 08:49:57 +13:00
-> setParam ( 'networkRequestSize' , 0 )
-> setParam ( 'networkResponseSize' , 0 )
-> submit ();
}
2020-11-03 19:53:32 +13:00
}
2021-06-12 02:20:18 +12:00
public function shutdown () : void
2020-05-05 01:34:31 +12:00
{
}
2021-09-01 21:09:04 +12:00
}