2022-10-14 05:27:04 +13:00
|
|
|
import env from "../environment"
|
|
|
|
import { getRedisOptions } from "../redis/utils"
|
|
|
|
import { JobQueue } from "./constants"
|
|
|
|
import InMemoryQueue from "./inMemoryQueue"
|
2023-11-04 02:05:23 +13:00
|
|
|
import BullQueue, { QueueOptions } from "bull"
|
2022-10-14 05:55:05 +13:00
|
|
|
import { addListeners, StalledFn } from "./listeners"
|
2023-11-04 02:05:23 +13:00
|
|
|
import { Duration } from "../utils"
|
2023-03-28 07:38:49 +13:00
|
|
|
import * as timers from "../timers"
|
2022-10-14 05:27:04 +13:00
|
|
|
|
2023-11-04 02:05:23 +13:00
|
|
|
// the queue lock is held for 5 minutes
|
|
|
|
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
|
|
|
// queue lock is refreshed every 30 seconds
|
|
|
|
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
|
|
|
|
// cleanup the queue every 60 seconds
|
|
|
|
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
|
2022-10-14 05:27:04 +13:00
|
|
|
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
|
|
|
|
let cleanupInterval: NodeJS.Timeout
|
|
|
|
|
|
|
|
async function cleanup() {
|
|
|
|
for (let queue of QUEUES) {
|
|
|
|
await queue.clean(CLEANUP_PERIOD_MS, "completed")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-15 07:24:03 +13:00
|
|
|
export function createQueue<T>(
|
2022-10-14 05:55:05 +13:00
|
|
|
jobQueue: JobQueue,
|
2022-10-25 00:06:50 +13:00
|
|
|
opts: { removeStalledCb?: StalledFn } = {}
|
2022-10-15 07:24:03 +13:00
|
|
|
): BullQueue.Queue<T> {
|
2023-11-04 06:17:20 +13:00
|
|
|
const { opts: redisOpts } = getRedisOptions()
|
2023-11-04 02:05:23 +13:00
|
|
|
const queueConfig: QueueOptions = {
|
2023-11-04 06:17:20 +13:00
|
|
|
redis: redisOpts,
|
2023-11-04 02:05:23 +13:00
|
|
|
settings: {
|
|
|
|
maxStalledCount: 0,
|
|
|
|
lockDuration: QUEUE_LOCK_MS,
|
|
|
|
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
|
|
|
|
},
|
|
|
|
}
|
2022-10-14 05:27:04 +13:00
|
|
|
let queue: any
|
2022-10-22 04:02:13 +13:00
|
|
|
if (!env.isTest()) {
|
2022-10-14 05:27:04 +13:00
|
|
|
queue = new BullQueue(jobQueue, queueConfig)
|
|
|
|
} else {
|
2022-10-14 05:55:05 +13:00
|
|
|
queue = new InMemoryQueue(jobQueue, queueConfig)
|
2022-10-14 05:27:04 +13:00
|
|
|
}
|
2022-10-24 22:04:14 +13:00
|
|
|
addListeners(queue, jobQueue, opts?.removeStalledCb)
|
2022-10-14 05:27:04 +13:00
|
|
|
QUEUES.push(queue)
|
2023-03-28 07:38:49 +13:00
|
|
|
if (!cleanupInterval && !env.isTest()) {
|
|
|
|
cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS)
|
2022-10-14 05:27:04 +13:00
|
|
|
// fire off an initial cleanup
|
|
|
|
cleanup().catch(err => {
|
|
|
|
console.error(`Unable to cleanup automation queue initially - ${err}`)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return queue
|
|
|
|
}
|
|
|
|
|
2022-11-26 08:57:07 +13:00
|
|
|
export async function shutdown() {
|
2023-02-28 03:41:28 +13:00
|
|
|
if (cleanupInterval) {
|
2023-03-28 07:38:49 +13:00
|
|
|
timers.clear(cleanupInterval)
|
2023-02-28 03:41:28 +13:00
|
|
|
}
|
|
|
|
if (QUEUES.length) {
|
2022-10-14 05:27:04 +13:00
|
|
|
for (let queue of QUEUES) {
|
|
|
|
await queue.close()
|
|
|
|
}
|
|
|
|
QUEUES = []
|
|
|
|
}
|
|
|
|
console.log("Queues shutdown")
|
|
|
|
}
|