diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index af2ec6dbaa..a8add7ecb6 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -36,7 +36,7 @@ class InMemoryQueue { * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts = null) { + constructor(name: string, opts?: any) { this._name = name this._opts = opts this._messages = [] diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 0658147709..c6b02921bf 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -2,10 +2,16 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" import InMemoryQueue from "./inMemoryQueue" -import BullQueue from "bull" +import BullQueue, { QueueOptions } from "bull" import { addListeners, StalledFn } from "./listeners" import * as timers from "../timers" +import * as Redis from "ioredis" +// the queue lock is held for 5 minutes +const QUEUE_LOCK_MS = 300000 +// queue lock is refreshed every 30 seconds +const QUEUE_LOCK_RENEW_INTERNAL_MS = 30000 +// cleanup the queue every 60 seconds const CLEANUP_PERIOD_MS = 60 * 1000 let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let cleanupInterval: NodeJS.Timeout @@ -21,7 +27,14 @@ export function createQueue( opts: { removeStalledCb?: StalledFn } = {} ): BullQueue.Queue { const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() - const queueConfig: any = redisProtocolUrl || { redis: redisOpts } + const queueConfig: QueueOptions = { + redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions), + settings: { + maxStalledCount: 0, + lockDuration: QUEUE_LOCK_MS, + lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, + }, + } let queue: any if (!env.isTest()) { queue = new BullQueue(jobQueue, queueConfig)