1
0
Fork 0
mirror of synced 2024-07-07 15:25:52 +12:00

Type inMemoryQueue

This commit is contained in:
Adria Navarro 2024-03-04 15:38:45 +01:00
parent f98a42fe12
commit 824dd1c1fc
2 changed files with 22 additions and 16 deletions

View file

@ -1,5 +1,6 @@
import events from "events"
import { timeout } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue"
/**
* Bull works with a Job wrapper around all messages that contains a lot more information about
@ -24,9 +25,9 @@ function newJob(queue: string, message: any) {
* It is relatively simple, using an event emitter internally to register when messages are available
* to the consumers - in can support many inputs and many consumers.
*/
class InMemoryQueue {
class InMemoryQueue implements Partial<Queue> {
_name: string
_opts?: any
_opts?: QueueOptions
_messages: any[]
_emitter: EventEmitter
_runCount: number
@ -37,7 +38,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull
*/
constructor(name: string, opts?: any) {
constructor(name: string, opts?: QueueOptions) {
this._name = name
this._opts = opts
this._messages = []
@ -55,8 +56,12 @@ class InMemoryQueue {
* note this is incredibly limited compared to Bull as in reality the Job would contain
* a lot more information about the queue and current status of Bull cluster.
*/
process(func: any) {
async process(func: any) {
this._emitter.on("message", async () => {
const delay = this._opts?.defaultJobOptions?.delay
if (delay) {
await new Promise<void>(r => setTimeout(() => r(), delay))
}
if (this._messages.length <= 0) {
return
}
@ -70,7 +75,7 @@ class InMemoryQueue {
}
async isReady() {
return true
return this as any
}
// simply puts a message to the queue and emits to the queue for processing
@ -83,27 +88,26 @@ class InMemoryQueue {
* @param repeat serves no purpose for the import queue.
*/
// eslint-disable-next-line no-unused-vars
add(msg: any, repeat: boolean) {
if (typeof msg !== "object") {
async add(data: any, opts?: JobOptions) {
if (typeof data !== "object") {
throw "Queue only supports carrying JSON."
}
this._messages.push(newJob(this._name, msg))
this._messages.push(newJob(this._name, data))
this._addCount++
this._emitter.emit("message")
return {} as any
}
/**
* replicating the close function from bull, which waits for jobs to finish.
*/
async close() {
return []
}
async close() {}
/**
* This removes a cron which has been implemented, this is part of Bull API.
* @param cronJobId The cron which is to be removed.
*/
removeRepeatableByKey(cronJobId: string) {
async removeRepeatableByKey(cronJobId: string) {
// TODO: implement for testing
console.log(cronJobId)
}
@ -111,12 +115,12 @@ class InMemoryQueue {
/**
* Implemented for tests
*/
getRepeatableJobs() {
async getRepeatableJobs() {
return []
}
// eslint-disable-next-line no-unused-vars
removeJobs(pattern: string) {
async removeJobs(pattern: string) {
// no-op
}
@ -128,12 +132,12 @@ class InMemoryQueue {
}
async getJob() {
return {}
return null
}
on() {
// do nothing
return this
return this as any
}
async waitForCompletion() {

View file

@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils"
import * as timers from "../timers"
export { QueueOptions, Queue, JobOptions } from "bull"
// the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds