1
0
Fork 0
mirror of synced 2024-06-01 18:39:57 +12:00
This commit is contained in:
shimon 2022-11-04 07:12:08 +02:00
parent 27c4e24fa5
commit bc631d926e
7 changed files with 307 additions and 3 deletions

View file

@ -349,6 +349,7 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/realtime && \
chmod +x /usr/local/bin/executor && \
chmod +x /usr/local/bin/schedule && \
chmod +x /usr/local/bin/schedule-new && \
chmod +x /usr/local/bin/sdks && \
chmod +x /usr/local/bin/specs && \
chmod +x /usr/local/bin/ssl && \

View file

@ -10,6 +10,7 @@ use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use InfluxDB\Database as InfluxDatabase;
@ -59,6 +60,33 @@ function getConsoleDB(): Database
return $database;
}
/**
* Get internal project database
* @param Document $project
* @return Database
*/
function getProjectDB(Document $project): Database
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
$dbAdapter = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$database = new Database($dbAdapter, getCache());
$database->setNamespace('_' . $project->getInternalId());
return $database;
}
function getCache(): Cache
{
global $register;
@ -92,6 +120,7 @@ include 'tasks/specs.php';
include 'tasks/ssl.php';
include 'tasks/vars.php';
include 'tasks/usage.php';
include 'tasks/schedule.php';
$cli
->task('version')

View file

@ -751,6 +751,96 @@ $collections = [
],
],
'schedules' => [
'$collection' => ID::custom(Database::METADATA),
'$id' => ID::custom('schedules'),
'name' => 'schedules',
'attributes' => [
[
'$id' => ID::custom('type'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('scheduleId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('projectId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('scheduleUpdatedAt'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('schedule'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 256,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('active'),
'type' => Database::VAR_BOOLEAN,
'signed' => true,
'size' => 0,
'format' => '',
'filters' => [],
'required' => false,
'default' => null,
'array' => false,
],
],
'indexes' => [
[
'$id' => ID::custom('_key_type_scheduleUpdatedAt'),
'type' => Database::INDEX_KEY,
'attributes' => ['type','scheduleUpdatedAt'],
'lengths' => [],
'orders' => [],
],
[
'$id' => ID::custom('_key_type_projectId_scheduleId'),
'type' => Database::INDEX_KEY,
'attributes' => ['type', 'projectId', 'scheduleId'],
'lengths' => [],
'orders' => [],
],
],
],
'platforms' => [
'$collection' => ID::custom(Database::METADATA),
'$id' => ID::custom('platforms'),

View file

@ -448,7 +448,8 @@ App::put('/v1/functions/:functionId')
->inject('project')
->inject('user')
->inject('events')
->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) {
->inject('dbForConsole')
->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -474,6 +475,36 @@ App::put('/v1/functions/:functionId')
])));
if ($next) {
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
return $dbForConsole->findOne('schedules', [
Query::equal('type', ['function']),
Query::equal('projectId', [$project->getId()]),
Query::equal('scheduleId', [$function->getId()]),
]);
});
// TODO constrain with unique index ??
if (empty($schedule)) {
Authorization::skip(
fn() => $dbForConsole->createDocument('schedules', new Document([
'type' => 'function',
'scheduleId' => $function->getId(),
'projectId' => $project->getId(),
'scheduleUpdatedAt' => $function['scheduleUpdatedAt'],
'schedule' => $function['schedule'],
'active' => true,
]))
);
} else {
$schedule
->setAttribute('scheduleUpdatedAt', $function['scheduleUpdatedAt'])
->setAttribute('schedule', $function['schedule'])
;
Authorization::skip(function () use ($dbForConsole, $schedule, $function) {
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
});
}
// Async task reschedule
$functionEvent = new Func();
$functionEvent
@ -560,7 +591,10 @@ App::delete('/v1/functions/:functionId')
->inject('dbForProject')
->inject('deletes')
->inject('events')
->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events) {
->inject('project')
->inject('dbForConsole')
->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events, Document $project, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -572,6 +606,25 @@ App::delete('/v1/functions/:functionId')
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
}
$schedule = Authorization::skip(function () use ($dbForConsole, $project, $function) {
return $dbForConsole->findOne('schedules', [
Query::equal('type', ['function']),
Query::equal('projectId', [$project->getId()]),
Query::equal('scheduleId', [$function->getId()]),
]);
});
if (!empty($schedule)) {
$schedule
->setAttribute('scheduleUpdatedAt', DateTime::now())
->setAttribute('active', false)
;
Authorization::skip(function () use ($dbForConsole, $schedule) {
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
});
}
$deletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($function);

97
app/tasks/schedule.php Normal file
View file

@ -0,0 +1,97 @@
<?php
global $cli;
global $register;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Swoole\Timer;
const FUNCTION_VALIDATION_TIMER = 30; //seconds
const FUNCTION_ENQUEUE_TIMER = 10; //seconds
$cli
->task('schedule-new')
->desc('Function scheduler task')
->action(function () use ($register) {
Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started');
sleep(4);
$dbForConsole = getConsoleDB();
$count = 0;
$limit = 50;
$sum = $limit;
$functions = [];
while ($sum === $limit) {
$results = $dbForConsole->find('schedules', [
Query::equal('type', ['function']),
Query::equal('active', [true]),
Query::limit($limit)
]);
$sum = count($results);
foreach ($results as $document) {
$functions[$document['scheduleId']] = $document;
$count++;
}
}
$lastValidationTime = DateTime::format((new \DateTime())->sub(\DateInterval::createFromDateString(FUNCTION_VALIDATION_TIMER . ' seconds')));
Co\run(
function () use ($dbForConsole, &$functions, &$lastValidationTime) {
Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($dbForConsole, &$functions, &$lastValidationTime) {
$time = DateTime::now();
Console::success("Validation proc run at : $time");
var_dump($lastValidationTime);
$count = 0;
$limit = 50;
$sum = $limit;
$tmp = [];
while ($sum === $limit) {
var_dump($lastValidationTime);
$results = $dbForConsole->find('schedules', [
Query::equal('type', ['function']),
Query::greaterThan('scheduleUpdatedAt', $lastValidationTime),
Query::limit($limit)
]);
$lastValidationTime = DateTime::now();
$sum = count($results);
foreach ($results as $document) {
$tmp['scheduleId'] = $document;
$count++;
}
}
foreach ($tmp as $document) {
$org = strtotime($functions[$document['scheduleId']]['scheduleUpdatedAt']);
$new = strtotime($document['scheduleUpdatedAt']);
var_dump($document['scheduleId']);
var_dump($document['active']);
if ($document['active'] === false) {
Console::error("Removing : {$document['scheduleId']}");
unset($functions[$document['scheduleId']]);
} elseif (!isset($functions[$document['scheduleId']]) || $new > $org) {
Console::error("Updating : {$document['scheduleId']}");
$functions[$document['scheduleId']] = $document;
}
$count++;
}
});
Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, $functions) {
$time = DateTime::now();
Console::success("Enqueue proc run at : $time");
foreach ($functions as $function) {
Console::info("Enqueueing : {$function->getid()}");
}
});
}
);
});

3
bin/schedule-new Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/cli.php schedule-new $@

View file

@ -108,6 +108,7 @@ services:
- ./public:/usr/src/code/public
- ./src:/usr/src/code/src
- ./dev:/usr/local/dev
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
depends_on:
- mariadb
- redis
@ -684,6 +685,36 @@ services:
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
appwrite-schedule-new:
entrypoint: schedule-new
<<: *x-logging
container_name: appwrite-schedule-new
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
- ./vendor/utopia-php/database:/usr/src/code/vendor/utopia-php/database
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_CONNECTIONS_DB_PROJECT
- _APP_CONNECTIONS_DB_CONSOLE
- _APP_CONNECTIONS_CACHE
- _APP_CONNECTIONS_QUEUE
appwrite-schedule:
entrypoint: schedule
<<: *x-logging
@ -732,7 +763,7 @@ services:
# - RELAY_FROM_HOSTS=192.168.0.0/16 ; *.yourdomain.com
# - SMARTHOST_HOST=smtp
# - SMARTHOST_PORT=587
redis:
image: redis:7.0.4-alpine
<<: *x-logging