diff --git a/packages/backend-core/src/cache/tests/writethrough.spec.ts b/packages/backend-core/src/cache/tests/writethrough.spec.ts index d346788121..a34f05e881 100644 --- a/packages/backend-core/src/cache/tests/writethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/writethrough.spec.ts @@ -1,10 +1,13 @@ -import { structures, DBTestConfiguration } from "../../../tests" +import { + structures, + DBTestConfiguration, + expectFunctionWasCalledTimesWith, +} from "../../../tests" import { Writethrough } from "../writethrough" import { getDB } from "../../db" import tk from "timekeeper" -const START_DATE = Date.now() -tk.freeze(START_DATE) +tk.freeze(Date.now()) const DELAY = 5000 @@ -17,34 +20,99 @@ describe("writethrough", () => { const writethrough = new Writethrough(db, DELAY) const writethrough2 = new Writethrough(db2, DELAY) + const docId = structures.uuid() + + beforeEach(() => { + jest.clearAllMocks() + }) + describe("put", () => { - let first: any + let current: any it("should be able to store, will go to DB", async () => { await config.doInTenant(async () => { - const response = await writethrough.put({ _id: "test", value: 1 }) + const response = await writethrough.put({ + _id: docId, + value: 1, + }) const output = await db.get(response.id) - first = output + current = output expect(output.value).toBe(1) }) }) it("second put shouldn't update DB", async () => { await config.doInTenant(async () => { - const response = await writethrough.put({ ...first, value: 2 }) + const response = await writethrough.put({ ...current, value: 2 }) const output = await db.get(response.id) - expect(first._rev).toBe(output._rev) + expect(current._rev).toBe(output._rev) expect(output.value).toBe(1) }) }) it("should put it again after delay period", async () => { await config.doInTenant(async () => { - tk.freeze(START_DATE + DELAY + 1) - const response = await writethrough.put({ ...first, value: 3 }) + tk.freeze(Date.now() + DELAY + 1) + const response = await writethrough.put({ ...current, value: 3 }) const output = await db.get(response.id) - expect(response.rev).not.toBe(first._rev) + expect(response.rev).not.toBe(current._rev) expect(output.value).toBe(3) + + current = output + }) + }) + + it("should handle parallel DB updates ignoring conflicts", async () => { + await config.doInTenant(async () => { + tk.freeze(Date.now() + DELAY + 1) + const responses = await Promise.all([ + writethrough.put({ ...current, value: 4 }), + writethrough.put({ ...current, value: 4 }), + writethrough.put({ ...current, value: 4 }), + ]) + + const newRev = responses.map(x => x.rev).find(x => x !== current._rev) + expect(newRev).toBeDefined() + expect(responses.map(x => x.rev)).toEqual( + expect.arrayContaining([current._rev, current._rev, newRev]) + ) + expectFunctionWasCalledTimesWith( + console.warn, + 2, + "bb-warn: Ignoring redlock conflict in write-through cache" + ) + + const output = await db.get(current._id) + expect(output.value).toBe(4) + expect(output._rev).toBe(newRev) + + current = output + }) + }) + + it("should handle updates with documents falling behind", async () => { + await config.doInTenant(async () => { + tk.freeze(Date.now() + DELAY + 1) + + const id = structures.uuid() + await writethrough.put({ _id: id, value: 1 }) + const doc = await writethrough.get(id) + + // Updating document + tk.freeze(Date.now() + DELAY + 1) + await writethrough.put({ ...doc, value: 2 }) + + // Update with the old rev value + tk.freeze(Date.now() + DELAY + 1) + const res = await writethrough.put({ + ...doc, + value: 3, + }) + expect(res.ok).toBe(true) + + const output = await db.get(id) + expect(output.value).toBe(3) + expect(output._rev).toBe(res.rev) }) }) }) @@ -52,8 +120,8 @@ describe("writethrough", () => { describe("get", () => { it("should be able to retrieve", async () => { await config.doInTenant(async () => { - const response = await writethrough.get("test") - expect(response.value).toBe(3) + const response = await writethrough.get(docId) + expect(response.value).toBe(4) }) }) }) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index dc889d5b18..a3b1ecc08d 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -1,7 +1,8 @@ import BaseCache from "./base" import { getWritethroughClient } from "../redis/init" import { logWarn } from "../logging" -import { Database } from "@budibase/types" +import { Database, Document, LockName, LockType } from "@budibase/types" +import * as locks from "../redis/redlockImpl" const DEFAULT_WRITE_RATE_MS = 10000 let CACHE: BaseCache | null = null @@ -27,44 +28,62 @@ function makeCacheItem(doc: any, lastWrite: number | null = null): CacheItem { return { doc, lastWrite: lastWrite || Date.now() } } -export async function put( +async function put( db: Database, - doc: any, + doc: Document, writeRateMs: number = DEFAULT_WRITE_RATE_MS ) { const cache = await getCache() const key = doc._id - let cacheItem: CacheItem | undefined = await cache.get(makeCacheKey(db, key)) + let cacheItem: CacheItem | undefined + if (key) { + cacheItem = await cache.get(makeCacheKey(db, key)) + } const updateDb = !cacheItem || cacheItem.lastWrite < Date.now() - writeRateMs let output = doc if (updateDb) { - const writeDb = async (toWrite: any) => { - // doc should contain the _id and _rev - const response = await db.put(toWrite) - output = { - ...doc, - _id: response.id, - _rev: response.rev, - } - } - try { - await writeDb(doc) - } catch (err: any) { - if (err.status !== 409) { - throw err - } else { - // Swallow 409s but log them - logWarn(`Ignoring conflict in write-through cache`) + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: key, + ttl: 1000, + }, + async () => { + const writeDb = async (toWrite: any) => { + // doc should contain the _id and _rev + const response = await db.put(toWrite, { force: true }) + output = { + ...doc, + _id: response.id, + _rev: response.rev, + } + } + try { + await writeDb(doc) + } catch (err: any) { + if (err.status !== 409) { + throw err + } else { + // Swallow 409s but log them + logWarn(`Ignoring conflict in write-through cache`) + } + } } + ) + if (!lockResponse.executed) { + logWarn(`Ignoring redlock conflict in write-through cache`) } } // if we are updating the DB then need to set the lastWrite to now cacheItem = makeCacheItem(output, updateDb ? null : cacheItem?.lastWrite) - await cache.store(makeCacheKey(db, key), cacheItem) + if (output._id) { + await cache.store(makeCacheKey(db, output._id), cacheItem) + } return { ok: true, id: output._id, rev: output._rev } } -export async function get(db: Database, id: string): Promise { +async function get(db: Database, id: string): Promise { const cache = await getCache() const cacheKey = makeCacheKey(db, id) let cacheItem: CacheItem = await cache.get(cacheKey) @@ -76,11 +95,7 @@ export async function get(db: Database, id: string): Promise { return cacheItem.doc } -export async function remove( - db: Database, - docOrId: any, - rev?: any -): Promise { +async function remove(db: Database, docOrId: any, rev?: any): Promise { const cache = await getCache() if (!docOrId) { throw new Error("No ID/Rev provided.") diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index 136d7f5d33..5e71488689 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -24,7 +24,7 @@ const getClient = async (type: LockType): Promise => { } } -export const OPTIONS = { +const OPTIONS = { TRY_ONCE: { // immediately throws an error if the lock is already held retryCount: 0, @@ -56,14 +56,29 @@ export const OPTIONS = { }, } -export const newRedlock = async (opts: Options = {}) => { +const newRedlock = async (opts: Options = {}) => { let options = { ...OPTIONS.DEFAULT, ...opts } const redisWrapper = await getLockClient() const client = redisWrapper.getClient() return new Redlock([client], options) } -export const doWithLock = async (opts: LockOptions, task: any) => { +type SuccessfulRedlockExecution = { + executed: true + result: T +} +type UnsuccessfulRedlockExecution = { + executed: false +} + +type RedlockExecution = + | SuccessfulRedlockExecution + | UnsuccessfulRedlockExecution + +export const doWithLock = async ( + opts: LockOptions, + task: () => Promise +): Promise> => { const redlock = await getClient(opts.type) let lock try { @@ -73,8 +88,8 @@ export const doWithLock = async (opts: LockOptions, task: any) => { let name: string = `lock:${prefix}_${opts.name}` // add additional unique name if required - if (opts.nameSuffix) { - name = name + `_${opts.nameSuffix}` + if (opts.resource) { + name = name + `_${opts.resource}` } // create the lock @@ -83,7 +98,7 @@ export const doWithLock = async (opts: LockOptions, task: any) => { // perform locked task // need to await to ensure completion before unlocking const result = await task() - return result + return { executed: true, result } } catch (e: any) { console.warn("lock error") // lock limit exceeded @@ -92,7 +107,7 @@ export const doWithLock = async (opts: LockOptions, task: any) => { // don't throw for try-once locks, they will always error // due to retry count (0) exceeded console.warn(e) - return + return { executed: false } } else { console.error(e) throw e diff --git a/packages/backend-core/tests/utilities/index.ts b/packages/backend-core/tests/utilities/index.ts index efe014908b..1c73216d76 100644 --- a/packages/backend-core/tests/utilities/index.ts +++ b/packages/backend-core/tests/utilities/index.ts @@ -4,4 +4,6 @@ export { generator } from "./structures" export * as testEnv from "./testEnv" export * as testContainerUtils from "./testContainerUtils" +export * from "./jestUtils" + export { default as DBTestConfiguration } from "./DBTestConfiguration" diff --git a/packages/backend-core/tests/utilities/jestUtils.ts b/packages/backend-core/tests/utilities/jestUtils.ts new file mode 100644 index 0000000000..d84eac548c --- /dev/null +++ b/packages/backend-core/tests/utilities/jestUtils.ts @@ -0,0 +1,9 @@ +export function expectFunctionWasCalledTimesWith( + jestFunction: any, + times: number, + argument: any +) { + expect( + jestFunction.mock.calls.filter((call: any) => call[0] === argument).length + ).toBe(times) +} diff --git a/packages/backend-core/tests/utilities/structures/db.ts b/packages/backend-core/tests/utilities/structures/db.ts index e25b707cb9..f4a677e777 100644 --- a/packages/backend-core/tests/utilities/structures/db.ts +++ b/packages/backend-core/tests/utilities/structures/db.ts @@ -1,5 +1,12 @@ +import { structures } from ".." import { newid } from "../../../src/newid" export function id() { return `db_${newid()}` } + +export function rev() { + return `${structures.generator.character({ + numeric: true, + })}-${structures.uuid().replace(/-/, "")}` +} diff --git a/packages/types/src/sdk/locks.ts b/packages/types/src/sdk/locks.ts index d868691891..3f6f03b811 100644 --- a/packages/types/src/sdk/locks.ts +++ b/packages/types/src/sdk/locks.ts @@ -13,6 +13,7 @@ export enum LockName { TRIGGER_QUOTA = "trigger_quota", SYNC_ACCOUNT_LICENSE = "sync_account_license", UPDATE_TENANTS_DOC = "update_tenants_doc", + PERSIST_WRITETHROUGH = "persist_writethrough", } export interface LockOptions { @@ -29,9 +30,9 @@ export interface LockOptions { */ ttl: number /** - * The suffix to add to the lock name for additional uniqueness + * The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts */ - nameSuffix?: string + resource?: string /** * This is a system-wide lock - don't use tenancy in lock key */