diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index cee272cef6..c8b28b3877 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -23,7 +23,7 @@ class DocWritethroughProcessor { docWritethroughProcessorQueue.process(async message => { const result = await locks.doWithLock( { - type: LockType.DEFAULT, + type: LockType.TRY_ONCE, name: LockName.PERSIST_DOC_WRITETHROUGH, resource: `${message.data.dbName}:${message.data.docId}`, ttl: Duration.fromSeconds(60).toMs(), diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 985501bcbe..afb5592562 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,8 +1,9 @@ import events from "events" -import { timeout } from "../utils" +import { newid, timeout } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" interface JobMessage { + id: string timestamp: number queue: string data: any @@ -20,6 +21,7 @@ interface JobMessage { */ function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { return { + id: newid(), timestamp: Date.now(), queue: queue, data: message, @@ -74,8 +76,23 @@ class InMemoryQueue implements Partial { let msg = this._messages.shift() let resp = func(msg) + + async function retryFunc(fnc: any) { + try { + await fnc + } catch (e: any) { + await new Promise(r => setTimeout(() => r(), 50)) + + await retryFunc(func(msg)) + } + } + if (resp.then != null) { - await resp + try { + await retryFunc(resp) + } catch (e: any) { + console.error(e) + } } this._runCount++ const jobId = msg?.opts?.jobId?.toString()