diff --git a/packages/server/src/automations/bullboard.js b/packages/server/src/automations/bullboard.js index cba6594ae7..af091843f2 100644 --- a/packages/server/src/automations/bullboard.js +++ b/packages/server/src/automations/bullboard.js @@ -8,12 +8,14 @@ const Queue = env.isTest() const { JobQueues } = require("../constants") const { utils } = require("@budibase/backend-core/redis") const { opts, redisProtocolUrl } = utils.getRedisOptions() +const listeners = require("./listeners") const CLEANUP_PERIOD_MS = 60 * 1000 const queueConfig = redisProtocolUrl || { redis: opts } let cleanupInternal = null let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig) +listeners.addListeners(automationQueue) async function cleanup() { await automationQueue.clean(CLEANUP_PERIOD_MS, "completed") diff --git a/packages/server/src/automations/listeners.ts b/packages/server/src/automations/listeners.ts new file mode 100644 index 0000000000..d53d0dddee --- /dev/null +++ b/packages/server/src/automations/listeners.ts @@ -0,0 +1,78 @@ +import { Queue, Job, JobId } from "bull" +import { AutomationEvent } from "../definitions/automations" +import * as automation from "../threads/automation" + +export const addListeners = (queue: Queue) => { + logging(queue) + // handleStalled(queue) +} + +const handleStalled = (queue: Queue) => { + queue.on("active", async (job: Job) => { + await automation.removeStalled(job as AutomationEvent) + }) +} + +const logging = (queue: Queue) => { + if (process.env.NODE_DEBUG?.includes("bull")) { + queue + .on("error", (error: any) => { + // An error occurred. + console.error(`automation-event=error error=${JSON.stringify(error)}`) + }) + .on("waiting", (jobId: JobId) => { + // A Job is waiting to be processed as soon as a worker is idling. + console.log(`automation-event=waiting jobId=${jobId}`) + }) + .on("active", (job: Job, jobPromise: any) => { + // A job has started. You can use `jobPromise.cancel()`` to abort it. + console.log(`automation-event=active jobId=${job.id}`) + }) + .on("stalled", (job: Job) => { + // A job has been marked as stalled. This is useful for debugging job + // workers that crash or pause the event loop. + console.error( + `automation-event=stalled jobId=${job.id} job=${JSON.stringify(job)}` + ) + }) + .on("progress", (job: Job, progress: any) => { + // A job's progress was updated! + console.log( + `automation-event=progress jobId=${job.id} progress=${progress}` + ) + }) + .on("completed", (job: Job, result) => { + // A job successfully completed with a `result`. + console.log( + `automation-event=completed jobId=${job.id} result=${result}` + ) + }) + .on("failed", (job, err: any) => { + // A job failed with reason `err`! + console.log(`automation-event=failed jobId=${job.id} error=${err}`) + }) + .on("paused", () => { + // The queue has been paused. + console.log(`automation-event=paused`) + }) + .on("resumed", (job: Job) => { + // The queue has been resumed. + console.log(`automation-event=paused jobId=${job.id}`) + }) + .on("cleaned", (jobs: Job[], type: string) => { + // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned + // jobs, and `type` is the type of jobs cleaned. + console.log( + `automation-event=cleaned length=${jobs.length} type=${type}` + ) + }) + .on("drained", () => { + // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) + console.log(`automation-event=drained`) + }) + .on("removed", (job: Job) => { + // A job successfully removed. + console.log(`automation-event=removed jobId=${job.id}`) + }) + } +} diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 6e392d23de..e0979ac0d9 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -21,11 +21,13 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId const CRON_STEP_ID = definitions.CRON.stepId const Runner = new Thread(ThreadType.AUTOMATION) +const jobMessage = (job: any, message: string) => { + return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}` +} + export async function processEvent(job: any) { try { - console.log( - `${job.data.automation.appId} automation ${job.data.automation._id} running. jobId=${job.id}` - ) + console.log(jobMessage(job, "running")) // need to actually await these so that an error can be captured properly const tenantId = tenancy.getTenantIDFromAppID(job.data.event.appId) return await tenancy.doInTenant(tenantId, async () => { @@ -34,9 +36,7 @@ export async function processEvent(job: any) { }) } catch (err) { const errJson = JSON.stringify(err) - console.error( - `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${errJson}` - ) + console.error(jobMessage(job, `was unable to run - ${errJson}`)) console.trace(err) return { err } } @@ -91,6 +91,7 @@ export async function disableAllCrons(appId: any) { export async function disableCron(jobId: string, jobKey: string) { await queue.removeRepeatableByKey(jobKey) await queue.removeJobs(jobId) + console.log(`jobId=${jobId} disabled`) } export async function clearMetadata() { diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index d04c49ce79..04b6ae413c 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -133,27 +133,34 @@ class Orchestrator { return metadata } + async stopCron(reason: string) { + if (!this._repeat) { + return + } + logWarn( + `CRON disabled reason=${reason} - ${this._appId}/${this._automation._id}` + ) + const automation = this._automation + const trigger = automation.definition.trigger + await disableCron(this._repeat?.jobId, this._repeat?.jobKey) + this.updateExecutionOutput( + trigger.id, + trigger.stepId, + {}, + { + status: AutomationStatus.STOPPED_ERROR, + success: false, + } + ) + await storeLog(automation, this.executionOutput) + } + async checkIfShouldStop(metadata: AutomationMetadata): Promise { if (!metadata.errorCount || !this._repeat) { return false } - const automation = this._automation - const trigger = automation.definition.trigger if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { - logWarn( - `CRON disabled due to errors - ${this._appId}/${this._automation._id}` - ) - await disableCron(this._repeat?.jobId, this._repeat?.jobKey) - this.updateExecutionOutput( - trigger.id, - trigger.stepId, - {}, - { - status: AutomationStatus.STOPPED_ERROR, - success: false, - } - ) - await storeLog(automation, this.executionOutput) + await this.stopCron("errors") return true } return false @@ -465,3 +472,15 @@ export function execute(input: AutomationEvent, callback: WorkerCallback) { } }) } + +export const removeStalled = (input: AutomationEvent) => { + const appId = input.data.event.appId + doInAppContext(appId, async () => { + const automationOrchestrator = new Orchestrator( + input.data.automation, + input.data.event, + input.opts + ) + await automationOrchestrator.stopCron("stalled") + }) +}