diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 333accc985..62b971f9f5 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -63,12 +63,12 @@ class InMemoryQueue implements Partial { * Same callback API as Bull, each callback passed to this will consume messages as they are * available. Please note this is a queue service, not a notification service, so each * consumer will receive different messages. - * @param func The callback function which will return a "Job", the same * as the Bull API, within this job the property "data" contains the JSON message. Please * note this is incredibly limited compared to Bull as in reality the Job would contain * a lot more information about the queue and current status of Bull cluster. */ - async process(func: any) { + async process(concurrencyOrFunc: number | any, func?: any) { + func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc this._emitter.on("message", async () => { if (this._messages.length <= 0) { return diff --git a/packages/server/src/appMigrations/appMigrationMetadata.ts b/packages/server/src/appMigrations/appMigrationMetadata.ts index d87ddff3ef..613f46cf3d 100644 --- a/packages/server/src/appMigrations/appMigrationMetadata.ts +++ b/packages/server/src/appMigrations/appMigrationMetadata.ts @@ -1,4 +1,4 @@ -import { Duration, cache, context, db, env } from "@budibase/backend-core" +import { Duration, cache, db, env } from "@budibase/backend-core" import { Database, DocumentType, Document } from "@budibase/types" export interface AppMigrationDoc extends Document { @@ -42,7 +42,10 @@ export async function getAppMigrationVersion(appId: string): Promise { version = "" } - await cache.store(cacheKey, version, EXPIRY_SECONDS) + // only cache if we have a valid version + if (version) { + await cache.store(cacheKey, version, EXPIRY_SECONDS) + } return version } @@ -54,8 +57,7 @@ export async function updateAppMigrationMetadata({ appId: string version: string }): Promise { - const db = context.getAppDB() - + const appDb = db.getDB(appId) let appMigrationDoc: AppMigrationDoc try { @@ -70,7 +72,7 @@ export async function updateAppMigrationMetadata({ version: "", history: {}, } - await db.put(appMigrationDoc) + await appDb.put(appMigrationDoc) appMigrationDoc = await getFromDB(appId) } @@ -82,7 +84,7 @@ export async function updateAppMigrationMetadata({ [version]: { runAt: new Date().toISOString() }, }, } - await db.put(updatedMigrationDoc) + await appDb.put(updatedMigrationDoc) const cacheKey = getCacheKey(appId) diff --git a/packages/server/src/appMigrations/index.ts b/packages/server/src/appMigrations/index.ts index 0440580d18..de15666215 100644 --- a/packages/server/src/appMigrations/index.ts +++ b/packages/server/src/appMigrations/index.ts @@ -16,7 +16,10 @@ export type AppMigration = { export function getLatestEnabledMigrationId(migrations?: AppMigration[]) { let latestMigrationId: string | undefined - for (let migration of migrations || MIGRATIONS) { + if (!migrations) { + migrations = MIGRATIONS + } + for (let migration of migrations) { // if a migration is disabled, all migrations after it are disabled if (migration.disabled) { break @@ -35,8 +38,14 @@ export async function checkMissingMigrations( next: Next, appId: string ) { - const currentVersion = await getAppMigrationVersion(appId) const latestMigration = getLatestEnabledMigrationId() + + // no migrations set - edge case, don't try to do anything + if (!latestMigration) { + return next() + } + + const currentVersion = await getAppMigrationVersion(appId) const queue = getAppMigrationQueue() if ( diff --git a/packages/server/src/appMigrations/migrationsProcessor.ts b/packages/server/src/appMigrations/migrationsProcessor.ts index 0337fc09f4..1441388564 100644 --- a/packages/server/src/appMigrations/migrationsProcessor.ts +++ b/packages/server/src/appMigrations/migrationsProcessor.ts @@ -13,8 +13,8 @@ export async function processMigrations( ) { console.log(`Processing app migration for "${appId}"`) try { - // have to wrap in context, this gets the tenant from the app ID - await context.doInAppContext(appId, async () => { + // first step - setup full context - tenancy, app and guards + await context.doInAppMigrationContext(appId, async () => { console.log(`Acquiring app migration lock for "${appId}"`) await locks.doWithLock( { @@ -23,48 +23,45 @@ export async function processMigrations( resource: appId, }, async () => { - await context.doInAppMigrationContext(appId, async () => { - console.log(`Lock acquired starting app migration for "${appId}"`) - let currentVersion = await getAppMigrationVersion(appId) + console.log(`Lock acquired starting app migration for "${appId}"`) + let currentVersion = await getAppMigrationVersion(appId) - const pendingMigrations = migrations - .filter(m => m.id > currentVersion) - .sort((a, b) => a.id.localeCompare(b.id)) + const pendingMigrations = migrations + .filter(m => m.id > currentVersion) + .sort((a, b) => a.id.localeCompare(b.id)) - const migrationIds = migrations.map(m => m.id).sort() - console.log( - `App migrations to run for "${appId}" - ${migrationIds.join(",")}` - ) + const migrationIds = migrations.map(m => m.id).sort() + console.log( + `App migrations to run for "${appId}" - ${migrationIds.join(",")}` + ) - let index = 0 - for (const { id, func } of pendingMigrations) { - const expectedMigration = - migrationIds[migrationIds.indexOf(currentVersion) + 1] + let index = 0 + for (const { id, func } of pendingMigrations) { + const expectedMigration = + migrationIds[migrationIds.indexOf(currentVersion) + 1] - if (expectedMigration !== id) { - throw new Error( - `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` - ) - } - - const counter = `(${++index}/${pendingMigrations.length})` - console.info(`Running migration ${id}... ${counter}`, { - migrationId: id, - appId, - }) - await func() - await updateAppMigrationMetadata({ - appId, - version: id, - }) - currentVersion = id + if (expectedMigration !== id) { + throw new Error( + `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` + ) } - }) + + const counter = `(${++index}/${pendingMigrations.length})` + console.info(`Running migration ${id}... ${counter}`, { + migrationId: id, + appId, + }) + await func() + await updateAppMigrationMetadata({ + appId, + version: id, + }) + currentVersion = id + } } ) - - console.log(`App migration for "${appId}" processed`) }) + console.log(`App migration for "${appId}" processed`) } catch (err) { logging.logAlert("Failed to run app migration", err) throw err diff --git a/packages/server/src/appMigrations/queue.ts b/packages/server/src/appMigrations/queue.ts index 5c932bcb7f..e2bc4406f1 100644 --- a/packages/server/src/appMigrations/queue.ts +++ b/packages/server/src/appMigrations/queue.ts @@ -2,9 +2,10 @@ import { queue, logging } from "@budibase/backend-core" import { Job } from "bull" import { MIGRATIONS } from "./migrations" import { processMigrations } from "./migrationsProcessor" -import { apiEnabled } from "../features" -const MAX_ATTEMPTS = 1 +const MAX_ATTEMPTS = 3 +// max number of migrations to run at same time, per node +const MIGRATION_CONCURRENCY = 5 export type AppMigrationJob = { appId: string @@ -13,10 +14,6 @@ export type AppMigrationJob = { let appMigrationQueue: queue.Queue | undefined export function init() { - // only run app migrations in main API services - if (!apiEnabled()) { - return - } appMigrationQueue = queue.createQueue( queue.JobQueue.APP_MIGRATION, { @@ -34,10 +31,10 @@ export function init() { } ) - return appMigrationQueue.process(processMessage) + return appMigrationQueue.process(MIGRATION_CONCURRENCY, processMessage) } -async function processMessage(job: Job) { +async function processMessage(job: Job) { const { appId } = job.data await processMigrations(appId, MIGRATIONS) diff --git a/packages/server/src/startup/index.ts b/packages/server/src/startup/index.ts index 750acdb0aa..c14ec0ca1b 100644 --- a/packages/server/src/startup/index.ts +++ b/packages/server/src/startup/index.ts @@ -115,8 +115,9 @@ export async function startup( // configure events to use the pro audit log write // can't integrate directly into backend-core due to cyclic issues queuePromises.push(events.processors.init(pro.sdk.auditLogs.write)) - queuePromises.push(appMigrations.init()) + // app migrations and automations on other service if (automationsEnabled()) { + queuePromises.push(appMigrations.init()) queuePromises.push(automations.init()) } queuePromises.push(initPro())