diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index c5db628ef0..0e06fb5ef0 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -3,24 +3,35 @@ import { JobQueue } from "./constants" export type StalledFn = (job: Job) => Promise -export const addListeners = ( +export function addListeners( queue: Queue, jobQueue: JobQueue, removeStalled?: StalledFn -) => { +) { logging(queue, jobQueue) if (removeStalled) { handleStalled(queue, removeStalled) } } -const handleStalled = (queue: Queue, removeStalled: StalledFn) => { +function handleStalled(queue: Queue, removeStalled?: StalledFn) { queue.on("stalled", async (job: Job) => { - await removeStalled(job) + if (removeStalled) { + await removeStalled(job) + } else if (job.opts.repeat) { + const jobId = job.id + const repeatJobs = await queue.getRepeatableJobs() + for (let repeatJob of repeatJobs) { + if (repeatJob.id === jobId) { + await queue.removeRepeatableByKey(repeatJob.key) + } + } + console.log(`jobId=${jobId} disabled`) + } }) } -const logging = (queue: Queue, jobQueue: JobQueue) => { +function logging(queue: Queue, jobQueue: JobQueue) { let eventType: string switch (jobQueue) { case JobQueue.AUTOMATIONS: diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 8f7ac79e7a..d83e421456 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -18,7 +18,7 @@ async function cleanup() { export function createQueue( jobQueue: JobQueue, - removeStalled: StalledFn + removeStalled?: StalledFn ): BullQueue.Queue { const queueConfig: any = redisProtocolUrl || { redis: opts } let queue: any diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 54b8078f30..0eebcb21cf 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -91,9 +91,13 @@ export async function disableAllCrons(appId: any) { return Promise.all(promises) } -export async function disableCron(jobId: string, jobKey: string) { - await automationQueue.removeRepeatableByKey(jobKey) - await automationQueue.removeJobs(jobId) +export async function disableCronById(jobId: number | string) { + const repeatJobs = await automationQueue.getRepeatableJobs() + for (let repeatJob of repeatJobs) { + if (repeatJob.id === jobId) { + await automationQueue.removeRepeatableByKey(repeatJob.key) + } + } console.log(`jobId=${jobId} disabled`) } diff --git a/packages/server/src/definitions/automations.ts b/packages/server/src/definitions/automations.ts index ed1455c049..877a1b4579 100644 --- a/packages/server/src/definitions/automations.ts +++ b/packages/server/src/definitions/automations.ts @@ -27,18 +27,6 @@ export interface TriggerOutput { timestamp?: number } -export interface AutomationEvent { - data: { - automation: Automation - event: any - } - opts?: { - repeat?: { - jobId: string - } - } -} - export interface AutomationContext extends AutomationResults { steps: any[] trigger: any diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 64ae9439d8..b4b290462e 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -1,6 +1,11 @@ import { default as threadUtils } from "./utils" +import { Job } from "bull" threadUtils.threadSetup() -import { isRecurring, disableCron, isErrorInOutput } from "../automations/utils" +import { + isRecurring, + disableCronById, + isErrorInOutput, +} from "../automations/utils" import { default as actions } from "../automations/actions" import { default as automationUtils } from "../automations/automationUtils" import { default as AutomationEmitter } from "../events/AutomationEmitter" @@ -13,7 +18,6 @@ import { LoopStep, LoopStepType, LoopInput, - AutomationEvent, TriggerOutput, AutomationContext, AutomationMetadata, @@ -73,19 +77,16 @@ class Orchestrator { _automation: Automation _emitter: any _context: AutomationContext - _repeat?: { jobId: string; jobKey: string } + _job: Job executionOutput: AutomationContext - constructor(automation: Automation, triggerOutput: TriggerOutput, opts: any) { + constructor(job: Job) { + let automation = job.data.automation, + triggerOutput = job.data.event const metadata = triggerOutput.metadata this._chainCount = metadata ? metadata.automationChainCount : 0 this._appId = triggerOutput.appId as string - if (opts?.repeat) { - this._repeat = { - jobId: opts.repeat.jobId, - jobKey: opts.repeat.key, - } - } + this._job = job const triggerStepId = automation.definition.trigger.stepId triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput) // remove from context @@ -134,7 +135,7 @@ class Orchestrator { } async stopCron(reason: string) { - if (!this._repeat) { + if (!this._job.opts.repeat) { return } logWarn( @@ -142,7 +143,7 @@ class Orchestrator { ) const automation = this._automation const trigger = automation.definition.trigger - await disableCron(this._repeat?.jobId, this._repeat?.jobKey) + await disableCronById(this._job.id) this.updateExecutionOutput( trigger.id, trigger.stepId, @@ -156,7 +157,7 @@ class Orchestrator { } async checkIfShouldStop(metadata: AutomationMetadata): Promise { - if (!metadata.errorCount || !this._repeat) { + if (!metadata.errorCount || !this._job.opts.repeat) { return false } if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { @@ -475,17 +476,13 @@ class Orchestrator { } } -export function execute(input: AutomationEvent, callback: WorkerCallback) { - const appId = input.data.event.appId +export function execute(job: Job, callback: WorkerCallback) { + const appId = job.data.event.appId if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.") } doInAppContext(appId, async () => { - const automationOrchestrator = new Orchestrator( - input.data.automation, - input.data.event, - input.opts - ) + const automationOrchestrator = new Orchestrator(job) try { const response = await automationOrchestrator.execute() callback(null, response) @@ -495,17 +492,13 @@ export function execute(input: AutomationEvent, callback: WorkerCallback) { }) } -export const removeStalled = async (input: AutomationEvent) => { - const appId = input.data.event.appId +export const removeStalled = async (job: Job) => { + const appId = job.data.event.appId if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.") } await doInAppContext(appId, async () => { - const automationOrchestrator = new Orchestrator( - input.data.automation, - input.data.event, - input.opts - ) + const automationOrchestrator = new Orchestrator(job) await automationOrchestrator.stopCron("stalled") }) }