From 4c873b9921e2645c6f43614df81996e0c3c32da9 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 10 Jun 2024 22:38:16 +0100 Subject: [PATCH 1/5] Attempting to fix some potential app migration issues around versions. --- .../src/appMigrations/appMigrationMetadata.ts | 14 ++++++++------ packages/server/src/appMigrations/index.ts | 5 ++++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/packages/server/src/appMigrations/appMigrationMetadata.ts b/packages/server/src/appMigrations/appMigrationMetadata.ts index d87ddff3ef..613f46cf3d 100644 --- a/packages/server/src/appMigrations/appMigrationMetadata.ts +++ b/packages/server/src/appMigrations/appMigrationMetadata.ts @@ -1,4 +1,4 @@ -import { Duration, cache, context, db, env } from "@budibase/backend-core" +import { Duration, cache, db, env } from "@budibase/backend-core" import { Database, DocumentType, Document } from "@budibase/types" export interface AppMigrationDoc extends Document { @@ -42,7 +42,10 @@ export async function getAppMigrationVersion(appId: string): Promise { version = "" } - await cache.store(cacheKey, version, EXPIRY_SECONDS) + // only cache if we have a valid version + if (version) { + await cache.store(cacheKey, version, EXPIRY_SECONDS) + } return version } @@ -54,8 +57,7 @@ export async function updateAppMigrationMetadata({ appId: string version: string }): Promise { - const db = context.getAppDB() - + const appDb = db.getDB(appId) let appMigrationDoc: AppMigrationDoc try { @@ -70,7 +72,7 @@ export async function updateAppMigrationMetadata({ version: "", history: {}, } - await db.put(appMigrationDoc) + await appDb.put(appMigrationDoc) appMigrationDoc = await getFromDB(appId) } @@ -82,7 +84,7 @@ export async function updateAppMigrationMetadata({ [version]: { runAt: new Date().toISOString() }, }, } - await db.put(updatedMigrationDoc) + await appDb.put(updatedMigrationDoc) const cacheKey = getCacheKey(appId) diff --git a/packages/server/src/appMigrations/index.ts b/packages/server/src/appMigrations/index.ts index 89c71ae26f..a24bf9c0a3 100644 --- a/packages/server/src/appMigrations/index.ts +++ b/packages/server/src/appMigrations/index.ts @@ -16,7 +16,10 @@ export type AppMigration = { export function getLatestEnabledMigrationId(migrations?: AppMigration[]) { let latestMigrationId: string | undefined - for (let migration of migrations || MIGRATIONS) { + if (!migrations) { + migrations = MIGRATIONS + } + for (let migration of migrations) { // if a migration is disabled, all migrations after it are disabled if (migration.disabled) { break From 8c1735a1bd7068e127ef80ac9675c5775f79d8a5 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 10 Jun 2024 22:58:28 +0100 Subject: [PATCH 2/5] Adding concurrency, and changing how context is set. --- .../src/appMigrations/migrationsProcessor.ts | 69 +++++++++---------- packages/server/src/appMigrations/queue.ts | 13 ++-- packages/server/src/startup/index.ts | 3 +- 3 files changed, 40 insertions(+), 45 deletions(-) diff --git a/packages/server/src/appMigrations/migrationsProcessor.ts b/packages/server/src/appMigrations/migrationsProcessor.ts index 0337fc09f4..1441388564 100644 --- a/packages/server/src/appMigrations/migrationsProcessor.ts +++ b/packages/server/src/appMigrations/migrationsProcessor.ts @@ -13,8 +13,8 @@ export async function processMigrations( ) { console.log(`Processing app migration for "${appId}"`) try { - // have to wrap in context, this gets the tenant from the app ID - await context.doInAppContext(appId, async () => { + // first step - setup full context - tenancy, app and guards + await context.doInAppMigrationContext(appId, async () => { console.log(`Acquiring app migration lock for "${appId}"`) await locks.doWithLock( { @@ -23,48 +23,45 @@ export async function processMigrations( resource: appId, }, async () => { - await context.doInAppMigrationContext(appId, async () => { - console.log(`Lock acquired starting app migration for "${appId}"`) - let currentVersion = await getAppMigrationVersion(appId) + console.log(`Lock acquired starting app migration for "${appId}"`) + let currentVersion = await getAppMigrationVersion(appId) - const pendingMigrations = migrations - .filter(m => m.id > currentVersion) - .sort((a, b) => a.id.localeCompare(b.id)) + const pendingMigrations = migrations + .filter(m => m.id > currentVersion) + .sort((a, b) => a.id.localeCompare(b.id)) - const migrationIds = migrations.map(m => m.id).sort() - console.log( - `App migrations to run for "${appId}" - ${migrationIds.join(",")}` - ) + const migrationIds = migrations.map(m => m.id).sort() + console.log( + `App migrations to run for "${appId}" - ${migrationIds.join(",")}` + ) - let index = 0 - for (const { id, func } of pendingMigrations) { - const expectedMigration = - migrationIds[migrationIds.indexOf(currentVersion) + 1] + let index = 0 + for (const { id, func } of pendingMigrations) { + const expectedMigration = + migrationIds[migrationIds.indexOf(currentVersion) + 1] - if (expectedMigration !== id) { - throw new Error( - `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` - ) - } - - const counter = `(${++index}/${pendingMigrations.length})` - console.info(`Running migration ${id}... ${counter}`, { - migrationId: id, - appId, - }) - await func() - await updateAppMigrationMetadata({ - appId, - version: id, - }) - currentVersion = id + if (expectedMigration !== id) { + throw new Error( + `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` + ) } - }) + + const counter = `(${++index}/${pendingMigrations.length})` + console.info(`Running migration ${id}... ${counter}`, { + migrationId: id, + appId, + }) + await func() + await updateAppMigrationMetadata({ + appId, + version: id, + }) + currentVersion = id + } } ) - - console.log(`App migration for "${appId}" processed`) }) + console.log(`App migration for "${appId}" processed`) } catch (err) { logging.logAlert("Failed to run app migration", err) throw err diff --git a/packages/server/src/appMigrations/queue.ts b/packages/server/src/appMigrations/queue.ts index 5c932bcb7f..e2bc4406f1 100644 --- a/packages/server/src/appMigrations/queue.ts +++ b/packages/server/src/appMigrations/queue.ts @@ -2,9 +2,10 @@ import { queue, logging } from "@budibase/backend-core" import { Job } from "bull" import { MIGRATIONS } from "./migrations" import { processMigrations } from "./migrationsProcessor" -import { apiEnabled } from "../features" -const MAX_ATTEMPTS = 1 +const MAX_ATTEMPTS = 3 +// max number of migrations to run at same time, per node +const MIGRATION_CONCURRENCY = 5 export type AppMigrationJob = { appId: string @@ -13,10 +14,6 @@ export type AppMigrationJob = { let appMigrationQueue: queue.Queue | undefined export function init() { - // only run app migrations in main API services - if (!apiEnabled()) { - return - } appMigrationQueue = queue.createQueue( queue.JobQueue.APP_MIGRATION, { @@ -34,10 +31,10 @@ export function init() { } ) - return appMigrationQueue.process(processMessage) + return appMigrationQueue.process(MIGRATION_CONCURRENCY, processMessage) } -async function processMessage(job: Job) { +async function processMessage(job: Job) { const { appId } = job.data await processMigrations(appId, MIGRATIONS) diff --git a/packages/server/src/startup/index.ts b/packages/server/src/startup/index.ts index 750acdb0aa..c14ec0ca1b 100644 --- a/packages/server/src/startup/index.ts +++ b/packages/server/src/startup/index.ts @@ -115,8 +115,9 @@ export async function startup( // configure events to use the pro audit log write // can't integrate directly into backend-core due to cyclic issues queuePromises.push(events.processors.init(pro.sdk.auditLogs.write)) - queuePromises.push(appMigrations.init()) + // app migrations and automations on other service if (automationsEnabled()) { + queuePromises.push(appMigrations.init()) queuePromises.push(automations.init()) } queuePromises.push(initPro()) From 553c2186b1b30e1694bd66222e8519b2057dbb28 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 10 Jun 2024 23:01:39 +0100 Subject: [PATCH 3/5] Only try to lookup migrations if there are migrations to work with. --- packages/server/src/appMigrations/index.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/server/src/appMigrations/index.ts b/packages/server/src/appMigrations/index.ts index 5aa28e14db..744f5a328f 100644 --- a/packages/server/src/appMigrations/index.ts +++ b/packages/server/src/appMigrations/index.ts @@ -38,8 +38,14 @@ export async function checkMissingMigrations( next: Next, appId: string ) { - const currentVersion = await getAppMigrationVersion(appId) const latestMigration = getLatestEnabledMigrationId() + + // no migrations set - edge case, don't try to do anything + if (!latestMigration) { + return + } + + const currentVersion = await getAppMigrationVersion(appId) const queue = getAppMigrationQueue() if ( From d0736cbe9e46ad70248aa6b39df57f12eee0db5a Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 10 Jun 2024 23:11:53 +0100 Subject: [PATCH 4/5] Missing next(). --- packages/server/src/appMigrations/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/appMigrations/index.ts b/packages/server/src/appMigrations/index.ts index 744f5a328f..de15666215 100644 --- a/packages/server/src/appMigrations/index.ts +++ b/packages/server/src/appMigrations/index.ts @@ -42,7 +42,7 @@ export async function checkMissingMigrations( // no migrations set - edge case, don't try to do anything if (!latestMigration) { - return + return next() } const currentVersion = await getAppMigrationVersion(appId) From 75c3b842ade3c28d08687b816e68bb72b3160ccb Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 10 Jun 2024 23:34:08 +0100 Subject: [PATCH 5/5] Fixing issue with in memory queue. --- packages/backend-core/src/queue/inMemoryQueue.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 333accc985..62b971f9f5 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -63,12 +63,12 @@ class InMemoryQueue implements Partial { * Same callback API as Bull, each callback passed to this will consume messages as they are * available. Please note this is a queue service, not a notification service, so each * consumer will receive different messages. - * @param func The callback function which will return a "Job", the same * as the Bull API, within this job the property "data" contains the JSON message. Please * note this is incredibly limited compared to Bull as in reality the Job would contain * a lot more information about the queue and current status of Bull cluster. */ - async process(func: any) { + async process(concurrencyOrFunc: number | any, func?: any) { + func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc this._emitter.on("message", async () => { if (this._messages.length <= 0) { return