diff --git a/packages/server/src/automations/listeners.ts b/packages/backend-core/src/queue/listeners.ts similarity index 52% rename from packages/server/src/automations/listeners.ts rename to packages/backend-core/src/queue/listeners.ts index 9f8667bd29..c5db628ef0 100644 --- a/packages/server/src/automations/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -1,78 +1,90 @@ -import { Queue, Job, JobId } from "bull" -import { AutomationEvent } from "../definitions/automations" -import * as automation from "../threads/automation" +import { Job, JobId, Queue } from "bull" +import { JobQueue } from "./constants" -export const addListeners = (queue: Queue) => { - logging(queue) - handleStalled(queue) +export type StalledFn = (job: Job) => Promise + +export const addListeners = ( + queue: Queue, + jobQueue: JobQueue, + removeStalled?: StalledFn +) => { + logging(queue, jobQueue) + if (removeStalled) { + handleStalled(queue, removeStalled) + } } -const handleStalled = (queue: Queue) => { +const handleStalled = (queue: Queue, removeStalled: StalledFn) => { queue.on("stalled", async (job: Job) => { - await automation.removeStalled(job as AutomationEvent) + await removeStalled(job) }) } -const logging = (queue: Queue) => { +const logging = (queue: Queue, jobQueue: JobQueue) => { + let eventType: string + switch (jobQueue) { + case JobQueue.AUTOMATIONS: + eventType = "automation-event" + break + case JobQueue.APP_BACKUPS: + eventType = "app-backup-event" + break + } if (process.env.NODE_DEBUG?.includes("bull")) { queue .on("error", (error: any) => { // An error occurred. - console.error(`automation-event=error error=${JSON.stringify(error)}`) + console.error(`${eventType}=error error=${JSON.stringify(error)}`) }) .on("waiting", (jobId: JobId) => { // A Job is waiting to be processed as soon as a worker is idling. - console.log(`automation-event=waiting jobId=${jobId}`) + console.log(`${eventType}=waiting jobId=${jobId}`) }) .on("active", (job: Job, jobPromise: any) => { // A job has started. You can use `jobPromise.cancel()`` to abort it. - console.log(`automation-event=active jobId=${job.id}`) + console.log(`${eventType}=active jobId=${job.id}`) }) .on("stalled", (job: Job) => { // A job has been marked as stalled. This is useful for debugging job // workers that crash or pause the event loop. console.error( - `automation-event=stalled jobId=${job.id} job=${JSON.stringify(job)}` + `${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}` ) }) .on("progress", (job: Job, progress: any) => { // A job's progress was updated! console.log( - `automation-event=progress jobId=${job.id} progress=${progress}` + `${eventType}=progress jobId=${job.id} progress=${progress}` ) }) .on("completed", (job: Job, result) => { // A job successfully completed with a `result`. - console.log( - `automation-event=completed jobId=${job.id} result=${result}` - ) + console.log(`${eventType}=completed jobId=${job.id} result=${result}`) }) .on("failed", (job, err: any) => { // A job failed with reason `err`! - console.log(`automation-event=failed jobId=${job.id} error=${err}`) + console.log(`${eventType}=failed jobId=${job.id} error=${err}`) }) .on("paused", () => { // The queue has been paused. - console.log(`automation-event=paused`) + console.log(`${eventType}=paused`) }) .on("resumed", (job: Job) => { // The queue has been resumed. - console.log(`automation-event=paused jobId=${job.id}`) + console.log(`${eventType}=paused jobId=${job.id}`) }) .on("cleaned", (jobs: Job[], type: string) => { // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // jobs, and `type` is the type of jobs cleaned. - console.log( - `automation-event=cleaned length=${jobs.length} type=${type}` - ) + console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`) }) .on("drained", () => { // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) - console.log(`automation-event=drained`) + console.log(`${eventType}=drained`) }) .on("removed", (job: Job) => { // A job successfully removed. - console.log(`automation-event=removed jobId=${job.id}`) + console.log(`${eventType}=removed jobId=${job.id}`) }) } } diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index de2c738ca4..8f7ac79e7a 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -1,9 +1,9 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" -import inMemoryQueue from "./inMemoryQueue" -import BullQueue from "bull" import InMemoryQueue from "./inMemoryQueue" +import BullQueue from "bull" +import { addListeners, StalledFn } from "./listeners" const { opts, redisProtocolUrl } = getRedisOptions() const CLEANUP_PERIOD_MS = 60 * 1000 @@ -16,14 +16,18 @@ async function cleanup() { } } -export function createQueue(jobQueue: JobQueue) { +export function createQueue( + jobQueue: JobQueue, + removeStalled: StalledFn +): BullQueue.Queue { const queueConfig: any = redisProtocolUrl || { redis: opts } let queue: any if (env.isTest()) { queue = new BullQueue(jobQueue, queueConfig) } else { - queue = new inMemoryQueue(jobQueue, queueConfig) + queue = new InMemoryQueue(jobQueue, queueConfig) } + addListeners(queue, jobQueue, removeStalled) QUEUES.push(queue) if (!cleanupInterval) { cleanupInterval = setInterval(cleanup, CLEANUP_PERIOD_MS) diff --git a/packages/server/src/automations/bullboard.js b/packages/server/src/automations/bullboard.js index 9dfa7546ec..af2243f13d 100644 --- a/packages/server/src/automations/bullboard.js +++ b/packages/server/src/automations/bullboard.js @@ -2,10 +2,12 @@ const { createBullBoard } = require("@bull-board/api") const { BullAdapter } = require("@bull-board/api/bullAdapter") const { KoaAdapter } = require("@bull-board/koa") const { queue } = require("@budibase/backend-core") -const listeners = require("./listeners") +const automation = require("../threads/automation") -let automationQueue = queue.createQueue(queue.JobQueue.AUTOMATIONS) -listeners.addListeners(automationQueue) +let automationQueue = queue.createQueue( + queue.JobQueue.AUTOMATIONS, + automation.removeStalled +) const PATH_PREFIX = "/bulladmin"