1
0
Fork 0
mirror of synced 2024-07-29 10:05:55 +12:00

Use message id to handle concurrency

This commit is contained in:
Adria Navarro 2024-03-05 23:32:02 +01:00
parent 6707da4ac2
commit 0a2fb4a3a6

View file

@ -1,9 +1,11 @@
import BaseCache from "./base" import BaseCache from "./base"
import { getDocWritethroughClient } from "../redis/init" import { getDocWritethroughClient } from "../redis/init"
import { AnyDocument, Database } from "@budibase/types" import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
import { JobQueue, createQueue } from "../queue" import { JobQueue, createQueue } from "../queue"
import * as dbUtils from "../db" import * as dbUtils from "../db"
import { Duration, newid } from "../utils"
import { context, locks } from ".."
let CACHE: BaseCache | null = null let CACHE: BaseCache | null = null
async function getCache() { async function getCache() {
@ -18,6 +20,7 @@ interface ProcessDocMessage {
dbName: string dbName: string
docId: string docId: string
cacheKeyPrefix: string cacheKeyPrefix: string
messageId: string
} }
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>( export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
@ -25,11 +28,46 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
) )
docWritethroughProcessorQueue.process(async message => { docWritethroughProcessorQueue.process(async message => {
await persistToDb(message.data) const { cacheKeyPrefix, messageId } = message.data
const cache = await getCache()
const latestMessageId = await cache.get(
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
)
if (messageId !== latestMessageId) {
// Nothing to do, another message overrode it
return
}
const lockResponse = await locks.doWithLock(
{
type: LockType.TRY_ONCE,
name: LockName.PERSIST_WRITETHROUGH,
resource: cacheKeyPrefix,
ttl: Duration.fromSeconds(60).toMs(),
},
async () => {
const latestMessageId = await cache.get(
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
)
if (messageId !== latestMessageId) {
// Nothing to do, another message overrode it
return
}
await persistToDb(cache, message.data)
console.log("DocWritethrough persisted", { data: message.data }) console.log("DocWritethrough persisted", { data: message.data })
}
)
if (!lockResponse.executed) {
console.log(`Ignoring redlock conflict in write-through cache`)
}
}) })
export async function persistToDb({ export async function persistToDb(
cache: BaseCache,
{
dbName, dbName,
docId, docId,
cacheKeyPrefix, cacheKeyPrefix,
@ -37,9 +75,8 @@ export async function persistToDb({
dbName: string dbName: string
docId: string docId: string
cacheKeyPrefix: string cacheKeyPrefix: string
}) { }
const cache = await getCache() ) {
const db = dbUtils.getDB(dbName) const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined let doc: AnyDocument | undefined
try { try {
@ -48,7 +85,9 @@ export async function persistToDb({
doc = { _id: docId } doc = { _id: docId }
} }
const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) const keysToPersist = await cache.keys(
REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL
)
for (const key of keysToPersist) { for (const key of keysToPersist) {
const data = await cache.get(key, { useTenancy: false }) const data = await cache.get(key, { useTenancy: false })
doc[data.key] = data.value doc[data.key] = data.value
@ -83,27 +122,38 @@ export class DocWritethrough {
const cache = await getCache() const cache = await getCache()
await this.storeToCache(cache, data) await this.storeToCache(cache, data)
const messageId = newid()
await cache.store(
REDIS_KEYS(this.cacheKeyPrefix).LATEST_MESSAGE_ID,
messageId
)
docWritethroughProcessorQueue.add( docWritethroughProcessorQueue.add(
{ {
dbName: this.db.name, dbName: this.db.name,
docId: this.docId, docId: this.docId,
cacheKeyPrefix: this.cacheKeyPrefix, cacheKeyPrefix: this.cacheKeyPrefix,
messageId,
}, },
{ {
delay: this.writeRateMs, delay: this.writeRateMs,
jobId: this.cacheKeyPrefix,
removeOnFail: true,
removeOnComplete: true,
} }
) )
} }
private async storeToCache(cache: BaseCache, data: Record<string, any>) { private async storeToCache(cache: BaseCache, data: Record<string, any>) {
data = Object.entries(data).reduce((acc, [key, value]) => { data = Object.entries(data).reduce((acc, [key, value]) => {
acc[this.cacheKeyPrefix + ":data:" + key] = { key, value } acc[REDIS_KEYS(this.cacheKeyPrefix).DATA.VALUE(key)] = { key, value }
return acc return acc
}, {} as Record<string, any>) }, {} as Record<string, any>)
await cache.bulkStore(data, null) await cache.bulkStore(data, null)
} }
} }
const REDIS_KEYS = (prefix: string) => ({
DATA: {
VALUE: (key: string) => prefix + ":data:" + key,
GET_ALL: prefix + ":data:*",
},
LATEST_MESSAGE_ID: prefix + ":info:latestMessageId",
})