1
0
Fork 0
mirror of synced 2024-07-04 22:11:23 +12:00
This commit is contained in:
Adria Navarro 2024-03-07 10:54:26 +01:00
parent bb5b40b61c
commit 977daff05c
2 changed files with 20 additions and 3 deletions

View file

@ -23,7 +23,7 @@ class DocWritethroughProcessor {
docWritethroughProcessorQueue.process(async message => {
const result = await locks.doWithLock(
{
type: LockType.DEFAULT,
type: LockType.TRY_ONCE,
name: LockName.PERSIST_DOC_WRITETHROUGH,
resource: `${message.data.dbName}:${message.data.docId}`,
ttl: Duration.fromSeconds(60).toMs(),

View file

@ -1,8 +1,9 @@
import events from "events"
import { timeout } from "../utils"
import { newid, timeout } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue"
interface JobMessage {
id: string
timestamp: number
queue: string
data: any
@ -20,6 +21,7 @@ interface JobMessage {
*/
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
return {
id: newid(),
timestamp: Date.now(),
queue: queue,
data: message,
@ -74,8 +76,23 @@ class InMemoryQueue implements Partial<Queue> {
let msg = this._messages.shift()
let resp = func(msg)
async function retryFunc(fnc: any) {
try {
await fnc
} catch (e: any) {
await new Promise<void>(r => setTimeout(() => r(), 50))
await retryFunc(func(msg))
}
}
if (resp.then != null) {
await resp
try {
await retryFunc(resp)
} catch (e: any) {
console.error(e)
}
}
this._runCount++
const jobId = msg?.opts?.jobId?.toString()