1
0
Fork 0
mirror of synced 2024-06-29 11:31:06 +12:00

Fixes for cronjob stop - correctly handle this without stalled job handle.

This commit is contained in:
mike12345567 2022-10-14 13:26:42 +01:00
parent 94188771df
commit 3507704680
5 changed files with 44 additions and 48 deletions

View file

@ -3,24 +3,35 @@ import { JobQueue } from "./constants"
export type StalledFn = (job: Job) => Promise<void> export type StalledFn = (job: Job) => Promise<void>
export const addListeners = ( export function addListeners(
queue: Queue, queue: Queue,
jobQueue: JobQueue, jobQueue: JobQueue,
removeStalled?: StalledFn removeStalled?: StalledFn
) => { ) {
logging(queue, jobQueue) logging(queue, jobQueue)
if (removeStalled) { if (removeStalled) {
handleStalled(queue, removeStalled) handleStalled(queue, removeStalled)
} }
} }
const handleStalled = (queue: Queue, removeStalled: StalledFn) => { function handleStalled(queue: Queue, removeStalled?: StalledFn) {
queue.on("stalled", async (job: Job) => { queue.on("stalled", async (job: Job) => {
await removeStalled(job) if (removeStalled) {
await removeStalled(job)
} else if (job.opts.repeat) {
const jobId = job.id
const repeatJobs = await queue.getRepeatableJobs()
for (let repeatJob of repeatJobs) {
if (repeatJob.id === jobId) {
await queue.removeRepeatableByKey(repeatJob.key)
}
}
console.log(`jobId=${jobId} disabled`)
}
}) })
} }
const logging = (queue: Queue, jobQueue: JobQueue) => { function logging(queue: Queue, jobQueue: JobQueue) {
let eventType: string let eventType: string
switch (jobQueue) { switch (jobQueue) {
case JobQueue.AUTOMATIONS: case JobQueue.AUTOMATIONS:

View file

@ -18,7 +18,7 @@ async function cleanup() {
export function createQueue( export function createQueue(
jobQueue: JobQueue, jobQueue: JobQueue,
removeStalled: StalledFn removeStalled?: StalledFn
): BullQueue.Queue { ): BullQueue.Queue {
const queueConfig: any = redisProtocolUrl || { redis: opts } const queueConfig: any = redisProtocolUrl || { redis: opts }
let queue: any let queue: any

View file

@ -91,9 +91,13 @@ export async function disableAllCrons(appId: any) {
return Promise.all(promises) return Promise.all(promises)
} }
export async function disableCron(jobId: string, jobKey: string) { export async function disableCronById(jobId: number | string) {
await automationQueue.removeRepeatableByKey(jobKey) const repeatJobs = await automationQueue.getRepeatableJobs()
await automationQueue.removeJobs(jobId) for (let repeatJob of repeatJobs) {
if (repeatJob.id === jobId) {
await automationQueue.removeRepeatableByKey(repeatJob.key)
}
}
console.log(`jobId=${jobId} disabled`) console.log(`jobId=${jobId} disabled`)
} }

View file

@ -27,18 +27,6 @@ export interface TriggerOutput {
timestamp?: number timestamp?: number
} }
export interface AutomationEvent {
data: {
automation: Automation
event: any
}
opts?: {
repeat?: {
jobId: string
}
}
}
export interface AutomationContext extends AutomationResults { export interface AutomationContext extends AutomationResults {
steps: any[] steps: any[]
trigger: any trigger: any

View file

@ -1,6 +1,11 @@
import { default as threadUtils } from "./utils" import { default as threadUtils } from "./utils"
import { Job } from "bull"
threadUtils.threadSetup() threadUtils.threadSetup()
import { isRecurring, disableCron, isErrorInOutput } from "../automations/utils" import {
isRecurring,
disableCronById,
isErrorInOutput,
} from "../automations/utils"
import { default as actions } from "../automations/actions" import { default as actions } from "../automations/actions"
import { default as automationUtils } from "../automations/automationUtils" import { default as automationUtils } from "../automations/automationUtils"
import { default as AutomationEmitter } from "../events/AutomationEmitter" import { default as AutomationEmitter } from "../events/AutomationEmitter"
@ -13,7 +18,6 @@ import {
LoopStep, LoopStep,
LoopStepType, LoopStepType,
LoopInput, LoopInput,
AutomationEvent,
TriggerOutput, TriggerOutput,
AutomationContext, AutomationContext,
AutomationMetadata, AutomationMetadata,
@ -73,19 +77,16 @@ class Orchestrator {
_automation: Automation _automation: Automation
_emitter: any _emitter: any
_context: AutomationContext _context: AutomationContext
_repeat?: { jobId: string; jobKey: string } _job: Job
executionOutput: AutomationContext executionOutput: AutomationContext
constructor(automation: Automation, triggerOutput: TriggerOutput, opts: any) { constructor(job: Job) {
let automation = job.data.automation,
triggerOutput = job.data.event
const metadata = triggerOutput.metadata const metadata = triggerOutput.metadata
this._chainCount = metadata ? metadata.automationChainCount : 0 this._chainCount = metadata ? metadata.automationChainCount : 0
this._appId = triggerOutput.appId as string this._appId = triggerOutput.appId as string
if (opts?.repeat) { this._job = job
this._repeat = {
jobId: opts.repeat.jobId,
jobKey: opts.repeat.key,
}
}
const triggerStepId = automation.definition.trigger.stepId const triggerStepId = automation.definition.trigger.stepId
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput) triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
// remove from context // remove from context
@ -134,7 +135,7 @@ class Orchestrator {
} }
async stopCron(reason: string) { async stopCron(reason: string) {
if (!this._repeat) { if (!this._job.opts.repeat) {
return return
} }
logWarn( logWarn(
@ -142,7 +143,7 @@ class Orchestrator {
) )
const automation = this._automation const automation = this._automation
const trigger = automation.definition.trigger const trigger = automation.definition.trigger
await disableCron(this._repeat?.jobId, this._repeat?.jobKey) await disableCronById(this._job.id)
this.updateExecutionOutput( this.updateExecutionOutput(
trigger.id, trigger.id,
trigger.stepId, trigger.stepId,
@ -156,7 +157,7 @@ class Orchestrator {
} }
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> { async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
if (!metadata.errorCount || !this._repeat) { if (!metadata.errorCount || !this._job.opts.repeat) {
return false return false
} }
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
@ -475,17 +476,13 @@ class Orchestrator {
} }
} }
export function execute(input: AutomationEvent, callback: WorkerCallback) { export function execute(job: Job, callback: WorkerCallback) {
const appId = input.data.event.appId const appId = job.data.event.appId
if (!appId) { if (!appId) {
throw new Error("Unable to execute, event doesn't contain app ID.") throw new Error("Unable to execute, event doesn't contain app ID.")
} }
doInAppContext(appId, async () => { doInAppContext(appId, async () => {
const automationOrchestrator = new Orchestrator( const automationOrchestrator = new Orchestrator(job)
input.data.automation,
input.data.event,
input.opts
)
try { try {
const response = await automationOrchestrator.execute() const response = await automationOrchestrator.execute()
callback(null, response) callback(null, response)
@ -495,17 +492,13 @@ export function execute(input: AutomationEvent, callback: WorkerCallback) {
}) })
} }
export const removeStalled = async (input: AutomationEvent) => { export const removeStalled = async (job: Job) => {
const appId = input.data.event.appId const appId = job.data.event.appId
if (!appId) { if (!appId) {
throw new Error("Unable to execute, event doesn't contain app ID.") throw new Error("Unable to execute, event doesn't contain app ID.")
} }
await doInAppContext(appId, async () => { await doInAppContext(appId, async () => {
const automationOrchestrator = new Orchestrator( const automationOrchestrator = new Orchestrator(job)
input.data.automation,
input.data.event,
input.opts
)
await automationOrchestrator.stopCron("stalled") await automationOrchestrator.stopCron("stalled")
}) })
} }