diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index a0bc14ec5c..3f84f82bc4 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -1,9 +1,11 @@ import BaseCache from "./base" import { getDocWritethroughClient } from "../redis/init" -import { AnyDocument, Database } from "@budibase/types" +import { AnyDocument, Database, LockName, LockType } from "@budibase/types" import { JobQueue, createQueue } from "../queue" import * as dbUtils from "../db" +import { Duration, newid } from "../utils" +import { context, locks } from ".." let CACHE: BaseCache | null = null async function getCache() { @@ -18,6 +20,7 @@ interface ProcessDocMessage { dbName: string docId: string cacheKeyPrefix: string + messageId: string } export const docWritethroughProcessorQueue = createQueue( @@ -25,21 +28,55 @@ export const docWritethroughProcessorQueue = createQueue( ) docWritethroughProcessorQueue.process(async message => { - await persistToDb(message.data) - console.log("DocWritethrough persisted", { data: message.data }) + const { cacheKeyPrefix, messageId } = message.data + + const cache = await getCache() + const latestMessageId = await cache.get( + REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID + ) + if (messageId !== latestMessageId) { + // Nothing to do, another message overrode it + return + } + + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: cacheKeyPrefix, + ttl: Duration.fromSeconds(60).toMs(), + }, + async () => { + const latestMessageId = await cache.get( + REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID + ) + if (messageId !== latestMessageId) { + // Nothing to do, another message overrode it + return + } + + await persistToDb(cache, message.data) + console.log("DocWritethrough persisted", { data: message.data }) + } + ) + + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) + } }) -export async function persistToDb({ - dbName, - docId, - cacheKeyPrefix, -}: { - dbName: string - docId: string - cacheKeyPrefix: string -}) { - const cache = await getCache() - +export async function persistToDb( + cache: BaseCache, + { + dbName, + docId, + cacheKeyPrefix, + }: { + dbName: string + docId: string + cacheKeyPrefix: string + } +) { const db = dbUtils.getDB(dbName) let doc: AnyDocument | undefined try { @@ -48,7 +85,9 @@ export async function persistToDb({ doc = { _id: docId } } - const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) + const keysToPersist = await cache.keys( + REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL + ) for (const key of keysToPersist) { const data = await cache.get(key, { useTenancy: false }) doc[data.key] = data.value @@ -83,27 +122,38 @@ export class DocWritethrough { const cache = await getCache() await this.storeToCache(cache, data) + const messageId = newid() + await cache.store( + REDIS_KEYS(this.cacheKeyPrefix).LATEST_MESSAGE_ID, + messageId + ) docWritethroughProcessorQueue.add( { dbName: this.db.name, docId: this.docId, cacheKeyPrefix: this.cacheKeyPrefix, + messageId, }, { delay: this.writeRateMs, - jobId: this.cacheKeyPrefix, - removeOnFail: true, - removeOnComplete: true, } ) } private async storeToCache(cache: BaseCache, data: Record) { data = Object.entries(data).reduce((acc, [key, value]) => { - acc[this.cacheKeyPrefix + ":data:" + key] = { key, value } + acc[REDIS_KEYS(this.cacheKeyPrefix).DATA.VALUE(key)] = { key, value } return acc }, {} as Record) await cache.bulkStore(data, null) } } + +const REDIS_KEYS = (prefix: string) => ({ + DATA: { + VALUE: (key: string) => prefix + ":data:" + key, + GET_ALL: prefix + ":data:*", + }, + LATEST_MESSAGE_ID: prefix + ":info:latestMessageId", +})