1
0
Fork 0
mirror of synced 2024-06-27 18:50:47 +12:00

fix: test

This commit is contained in:
loks0n 2024-06-17 13:44:12 +01:00
parent 9341c9d76b
commit 2f0f7bf9c7
13 changed files with 185 additions and 39 deletions

View file

@ -79,6 +79,7 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/migrate && \
chmod +x /usr/local/bin/realtime && \
chmod +x /usr/local/bin/schedule-functions && \
chmod +x /usr/local/bin/schedule-executions && \
chmod +x /usr/local/bin/schedule-messages && \
chmod +x /usr/local/bin/sdks && \
chmod +x /usr/local/bin/specs && \

View file

@ -1722,7 +1722,7 @@ App::post('/v1/functions/:functionId/executions')
'functionId' => $function->getId(),
'deploymentInternalId' => $deployment->getInternalId(),
'deploymentId' => $deployment->getId(),
'trigger' => 'http', // http / schedule / event
'trigger' => (!is_null($scheduledAt)) ? 'schedule' : 'http',
'status' => $status, // waiting / processing / completed / failed
'responseStatusCode' => 0,
'responseHeaders' => [],
@ -1764,9 +1764,9 @@ App::post('/v1/functions/:functionId/executions')
} else {
$dbForConsole->createDocument('schedules', new Document([
'region' => System::getEnv('_APP_REGION', 'default'),
'resourceType' => 'function',
'resourceId' => $function->getId(),
'resourceInternalId' => $function->getInternalId(),
'resourceType' => 'execution',
'resourceId' => $execution->getId(),
'resourceInternalId' => $execution->getInternalId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $scheduledAt,

View file

@ -676,6 +676,31 @@ services:
- _APP_DB_USER
- _APP_DB_PASS
appwrite-task-scheduler-executions:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: schedule-executions
container_name: appwrite-task-scheduler-executions
<<: *x-logging
restart: unless-stopped
networks:
- appwrite
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
appwrite-task-scheduler-messages:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: schedule-messages

3
bin/schedule-executions Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule-executions $@

View file

@ -10,8 +10,6 @@ x-logging: &x-logging
max-file: "5"
max-size: "10m"
version: "3"
services:
traefik:
image: traefik:2.11
@ -742,6 +740,33 @@ services:
- _APP_DB_USER
- _APP_DB_PASS
appwrite-task-scheduler-executions:
entrypoint: schedule-executions
<<: *x-logging
container_name: appwrite-task-scheduler-executions
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
appwrite-task-scheduler-messages:
entrypoint: schedule-messages
<<: *x-logging

View file

@ -14,6 +14,7 @@ class Func extends Event
protected string $path = '';
protected string $method = '';
protected array $headers = [];
protected ?string $functionId = null;
protected ?Document $function = null;
protected ?Document $execution = null;
@ -49,6 +50,28 @@ class Func extends Event
return $this->function;
}
/**
* Sets function id for the function event.
*
* @param string $functionId
*/
public function setFunctionId(string $functionId): self
{
$this->functionId = $functionId;
return $this;
}
/**
* Returns set function id for the function event.
*
* @return string|null
*/
public function getFunctionId(): ?string
{
return $this->functionId;
}
/**
* Sets execution for the function event.
*
@ -200,6 +223,7 @@ class Func extends Event
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
'functionId' => $this->functionId,
'execution' => $this->execution,
'type' => $this->type,
'jwt' => $this->jwt,

View file

@ -9,6 +9,7 @@ use Appwrite\Platform\Tasks\Migrate;
use Appwrite\Platform\Tasks\QueueCount;
use Appwrite\Platform\Tasks\QueueRetry;
use Appwrite\Platform\Tasks\ScheduleFunctions;
use Appwrite\Platform\Tasks\ScheduleExecutions;
use Appwrite\Platform\Tasks\ScheduleMessages;
use Appwrite\Platform\Tasks\SDKs;
use Appwrite\Platform\Tasks\Specs;
@ -33,6 +34,7 @@ class Tasks extends Service
->addAction(SDKs::getName(), new SDKs())
->addAction(SSL::getName(), new SSL())
->addAction(ScheduleFunctions::getName(), new ScheduleFunctions())
->addAction(ScheduleExecutions::getName(), new ScheduleExecutions())
->addAction(ScheduleMessages::getName(), new ScheduleMessages())
->addAction(Specs::getName(), new Specs())
->addAction(Upgrade::getName(), new Upgrade())

View file

@ -64,7 +64,8 @@ abstract class ScheduleBase extends Action
$collectionId = match ($schedule->getAttribute('resourceType')) {
'function' => 'functions',
'message' => 'messages'
'message' => 'messages',
'execution' => 'executions'
};
$resource = $getProjectDB($project)->getDocument(
@ -113,7 +114,8 @@ abstract class ScheduleBase extends Action
} catch (\Throwable $th) {
$collectionId = match ($document->getAttribute('resourceType')) {
'function' => 'functions',
'message' => 'messages'
'message' => 'messages',
'execution' => 'executions'
};
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");

View file

@ -0,0 +1,67 @@
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Cron\CronExpression;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Pools\Group;
class ScheduleExecutions extends ScheduleBase
{
public const UPDATE_TIMER = 3; // seconds
public const ENQUEUE_TIMER = 4; // seconds
public static function getName(): string
{
return 'schedule-executions';
}
public static function getSupportedResource(): string
{
return 'execution';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active'] || CronExpression::isValidExpression($schedule['schedule'])) {
unset($this->schedules[$schedule['resourceId']]);
continue;
}
$now = new \DateTime();
$scheduledAt = new \DateTime($schedule['schedule']);
if ($scheduledAt > $now) {
continue;
}
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$queueForFunctions
->setType('schedule')
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
$dbForConsole->deleteDocument(
'schedules',
$schedule['$id'],
);
$queue->reclaim();
unset($this->schedules[$schedule['resourceId']]);
});
}
}
}

View file

@ -11,8 +11,8 @@ use Utopia\Pools\Group;
class ScheduleFunctions extends ScheduleBase
{
public const UPDATE_TIMER = 3; // seconds
public const ENQUEUE_TIMER = 4; // seconds
public const UPDATE_TIMER = 10; // seconds
public const ENQUEUE_TIMER = 60; // seconds
private ?float $lastEnqueueUpdate = null;
@ -40,39 +40,37 @@ class ScheduleFunctions extends ScheduleBase
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $scheduleKey => $schedule) {
if (CronExpression::isValidExpression($schedule['schedule'])) {
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
} else {
try {
$nextDate = new \DateTime($schedule['schedule']);
$schedule['delete'] = true;
} catch (\Exception) {
Console::error('Failed to parse schedule: ' . $schedule['schedule']);
continue;
}
foreach ($this->schedules as $key => $schedule) {
if (!$schedule['active'] || !CronExpression::isValidExpression($schedule['schedule'])) {
unset($this->schedules[$schedule['resourceId']]);
continue;
}
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total += 1;
$delay = $nextDate->getTimestamp() - \time(); // Time to wait from now until execution needs to be queued
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $scheduleKey;
$delayedExecutions[$delay][] = $key;
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForConsole) {
\go(function () use ($delay, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
@ -83,6 +81,7 @@ class ScheduleFunctions extends ScheduleBase
if (!\array_key_exists($scheduleKey, $this->schedules)) {
return;
}
$schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($connection);
@ -94,13 +93,6 @@ class ScheduleFunctions extends ScheduleBase
->setPath('/')
->setProject($schedule['project'])
->trigger();
if ($schedule['delete']) {
$dbForConsole->deleteDocument(
'schedules',
$schedule['$id'],
);
}
}
$queue->reclaim();

View file

@ -35,7 +35,7 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($now, $schedule, $pools, $dbForConsole) {
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);

View file

@ -83,6 +83,7 @@ class Functions extends Action
$eventData = $payload['payload'] ?? '';
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
$functionId = $payload['functionId'] ?? '';
$user = new Document($payload['user'] ?? []);
$method = $payload['method'] ?? 'POST';
$headers = $payload['headers'] ?? [];
@ -92,6 +93,10 @@ class Functions extends Action
return;
}
if ($function->isEmpty() && !empty($functionId)) {
$function = $dbForProject->getDocument('functions', $functionId);
}
$log->addTag('functionId', $function->getId());
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
@ -176,6 +181,7 @@ class Functions extends Action
);
break;
case 'schedule':
$execution = new Document($payload['execution'] ?? []);
$this->execute(
log: $log,
dbForProject: $dbForProject,
@ -193,7 +199,7 @@ class Functions extends Action
jwt: null,
event: null,
eventData: null,
executionId: null,
executionId: $execution->getId() ?? null
);
break;
}
@ -296,7 +302,6 @@ class Functions extends Action
$headers['x-appwrite-user-id'] = $user->getId() ?? '';
$headers['x-appwrite-user-jwt'] = $jwt ?? '';
/** Create execution or update execution status */
/** Create execution or update execution status */
$execution = $dbForProject->getDocument('executions', $executionId ?? '');
if ($execution->isEmpty()) {

View file

@ -57,6 +57,7 @@ class FunctionsCustomClientTest extends Scope
'execute' => [Role::user($this->getUser()['$id'])->toString()],
'runtime' => 'php-8.0',
'entrypoint' => 'index.php',
'logging' => true,
'events' => [
'users.*.create',
'users.*.delete',
@ -246,6 +247,7 @@ class FunctionsCustomClientTest extends Scope
$this->assertEquals(200, $function['headers']['status-code']);
// Schedule execution for the future
\date_default_timezone_set('UTC');
$futureTime = (new \DateTime())->add(new \DateInterval('PT10S'))->format('Y-m-d H:i:s');
$execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([
'content-type' => 'application/json',
@ -260,7 +262,7 @@ class FunctionsCustomClientTest extends Scope
$executionId = $execution['body']['$id'];
sleep(12);
sleep(20);
$execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [
'content-type' => 'application/json',
@ -281,8 +283,6 @@ class FunctionsCustomClientTest extends Scope
$this->assertEquals(204, $response['headers']['status-code']);
}
public function testCreateCustomExecution(): array
{
/**